Browse Source

[ML] Switch poor categorization audit warning to use status field (#52195)

In #51146 a rudimentary check for poor categorization was added to
7.6.

This change replaces that warning based on a Java-side check with
a new one based on the categorization_status field that the ML C++
sets.  categorization_status was added in 7.7 and above by #51879,
so this new warning based on more advanced conditions will also be
in 7.7 and above.

Closes #50749
David Roberts 5 years ago
parent
commit
4a3ca41fb7

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -135,7 +135,7 @@ public final class Messages {
             "Adjust the analysis_limits.model_memory_limit setting to ensure all data is analyzed";
     public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2 = "Job memory status changed to hard_limit at {0}; adjust the " +
         "analysis_limits.model_memory_limit setting to ensure all data is analyzed";
-    public static final String JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES = "{0} categories observed in the first [{1}] buckets." +
+    public static final String JOB_AUDIT_CATEGORIZATION_STATUS_WARN = "categorization_status changed to [{0}] after [{1}] buckets." +
         " This suggests an inappropriate categorization_field_name has been chosen.";
 
     public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates";

+ 14 - 22
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

@@ -74,9 +74,6 @@ public class AutodetectResultProcessor {
 
     private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class);
 
-    static final long EARLY_BUCKET_THRESHOLD = 100;
-    static final int EXCESSIVE_EARLY_CATEGORY_COUNT = 1000;
-
     private final Client client;
     private final AnomalyDetectionAuditor auditor;
     private final String jobId;
@@ -90,9 +87,8 @@ public class AutodetectResultProcessor {
     private final FlushListener flushListener;
     private volatile boolean processKilled;
     private volatile boolean failed;
-    private long priorRunsBucketCount;
+    private final long priorRunsBucketCount;
     private long currentRunBucketCount; // only used from the process() thread, so doesn't need to be volatile
-    private boolean excessiveCategoryWarningIssued; // only used from the process() thread, so doesn't need to be volatile
     private final JobResultsPersister.Builder bulkResultsPersister;
     private boolean deleteInterimRequired;
 
@@ -230,7 +226,7 @@ public class AutodetectResultProcessor {
         }
         CategoryDefinition categoryDefinition = result.getCategoryDefinition();
         if (categoryDefinition != null) {
-            processCategoryDefinition(categoryDefinition);
+            persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
         }
         ModelPlot modelPlot = result.getModelPlot();
         if (modelPlot != null) {
@@ -314,22 +310,6 @@ public class AutodetectResultProcessor {
         }
     }
 
-    private void processCategoryDefinition(CategoryDefinition categoryDefinition) {
-        persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
-        if (categoryDefinition.getCategoryId() == EXCESSIVE_EARLY_CATEGORY_COUNT &&
-            priorRunsBucketCount + currentRunBucketCount < EARLY_BUCKET_THRESHOLD &&
-            excessiveCategoryWarningIssued == false) {
-            auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES, EXCESSIVE_EARLY_CATEGORY_COUNT,
-                // Add 1 because category definitions are written before buckets
-                1L + priorRunsBucketCount + currentRunBucketCount));
-            // This flag won't be retained if the job is closed and reopened, or if the job migrates to another node.
-            // This means it's possible the audit message is generated multiple times.  However, that's not a
-            // disaster, and is also very unlikely in the the (best practice) cases where initial lookback covers
-            // more than 100 buckets.
-            excessiveCategoryWarningIssued = true;
-        }
-    }
-
     private void processModelSizeStats(ModelSizeStats modelSizeStats) {
         LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
                 jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),
@@ -338,6 +318,8 @@ public class AutodetectResultProcessor {
 
         persister.persistModelSizeStats(modelSizeStats, this::isAlive);
         notifyModelMemoryStatusChange(modelSizeStats);
+        notifyCategorizationStatusChange(modelSizeStats);
+
         latestModelSizeStats = modelSizeStats;
     }
 
@@ -359,6 +341,16 @@ public class AutodetectResultProcessor {
         }
     }
 
+    private void notifyCategorizationStatusChange(ModelSizeStats modelSizeStats) {
+        ModelSizeStats.CategorizationStatus categorizationStatus = modelSizeStats.getCategorizationStatus();
+        if (categorizationStatus != latestModelSizeStats.getCategorizationStatus()) {
+            if (categorizationStatus == ModelSizeStats.CategorizationStatus.WARN) {
+                auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, categorizationStatus,
+                    priorRunsBucketCount + currentRunBucketCount));
+            }
+        }
+    }
+
     protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
         JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
         UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);

+ 42 - 67
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java

@@ -17,7 +17,6 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ESTestCase;
@@ -52,14 +51,12 @@ import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doThrow;
@@ -213,65 +210,6 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
     }
 
-    public void testProcessResult_excessiveCategoryDefinitionCountEarly() {
-        int iterations = 3;
-        int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;
-
-        processorUnderTest.setDeleteInterimRequired(false);
-
-        AutodetectResult result = mock(AutodetectResult.class);
-        for (int iteration = 1; iteration <= iterations; ++iteration) {
-            for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
-                CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
-                categoryDefinition.setCategoryId(categoryId);
-                when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
-
-                processorUnderTest.processResult(result);
-            }
-        }
-
-        verify(bulkBuilder, never()).executeRequest();
-        verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
-        verify(auditor).warning(eq(JOB_ID), eq(Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES,
-            AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT, 1)));
-    }
-
-    public void testProcessResult_highCategoryDefinitionCountLateOn() {
-        int iterations = 3;
-        int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;
-
-        processorUnderTest.setDeleteInterimRequired(false);
-
-        when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
-        when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
-
-        AutodetectResult bucketResult = mock(AutodetectResult.class);
-        final int numPriorBuckets = (int) AutodetectResultProcessor.EARLY_BUCKET_THRESHOLD + 1;
-        for (int i = 0; i < numPriorBuckets; ++i) {
-            Bucket bucket = new Bucket(JOB_ID, new Date(i * 1000 + 1000000), BUCKET_SPAN_MS);
-            when(bucketResult.getBucket()).thenReturn(bucket);
-            processorUnderTest.processResult(bucketResult);
-        }
-
-        AutodetectResult categoryResult = mock(AutodetectResult.class);
-        for (int iteration = 1; iteration <= iterations; ++iteration) {
-            for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
-                CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
-                categoryDefinition.setCategoryId(categoryId);
-                when(categoryResult.getCategoryDefinition()).thenReturn(categoryDefinition);
-                processorUnderTest.processResult(categoryResult);
-            }
-        }
-
-        verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
-        verify(bulkBuilder, times(numPriorBuckets)).persistBucket(any(Bucket.class));
-        verify(bulkBuilder, times(numPriorBuckets)).executeRequest();
-        verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
-        verify(auditor, never()).warning(eq(JOB_ID), anyString());
-    }
-
     public void testProcessResult_flushAcknowledgement() {
         AutodetectResult result = mock(AutodetectResult.class);
         FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
@@ -335,11 +273,6 @@ public class AutodetectResultProcessorTests extends ESTestCase {
     }
 
     public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
-        TimeValue delay = TimeValue.timeValueSeconds(5);
-        // Set up schedule delay time
-        when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString()))
-            .thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[0], delay.nanos(), TimeUnit.NANOSECONDS));
-
         AutodetectResult result = mock(AutodetectResult.class);
         processorUnderTest.setDeleteInterimRequired(false);
 
@@ -374,6 +307,48 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb"));
     }
 
+    public void testProcessResult_modelSizeStatsWithCategorizationStatusChanges() {
+        AutodetectResult result = mock(AutodetectResult.class);
+        processorUnderTest.setDeleteInterimRequired(false);
+
+        // First one with ok
+        ModelSizeStats modelSizeStats =
+            new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.OK).build();
+        when(result.getModelSizeStats()).thenReturn(modelSizeStats);
+        processorUnderTest.processResult(result);
+
+        // Now one with warn
+        modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.WARN).build();
+        when(result.getModelSizeStats()).thenReturn(modelSizeStats);
+        processorUnderTest.processResult(result);
+
+        // Another with warn
+        modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.WARN).build();
+        when(result.getModelSizeStats()).thenReturn(modelSizeStats);
+        processorUnderTest.processResult(result);
+
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
+        verify(persister, times(3)).persistModelSizeStats(any(ModelSizeStats.class), any());
+        // We should have only fired one notification; only the change from ok to warn should have fired, not the subsequent warn
+        verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, "warn", 0));
+    }
+
+    public void testProcessResult_modelSizeStatsWithFirstCategorizationStatusWarn() {
+        AutodetectResult result = mock(AutodetectResult.class);
+        processorUnderTest.setDeleteInterimRequired(false);
+
+        // First one with warn - this works because a default constructed ModelSizeStats has CategorizationStatus.OK
+        ModelSizeStats modelSizeStats =
+            new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(ModelSizeStats.CategorizationStatus.WARN).build();
+        when(result.getModelSizeStats()).thenReturn(modelSizeStats);
+        processorUnderTest.processResult(result);
+
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
+        verify(persister).persistModelSizeStats(any(ModelSizeStats.class), any());
+        // We should have only fired one notification; only the change from ok to warn should have fired, not the subsequent warn
+        verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, "warn", 0));
+    }
+
     public void testProcessResult_modelSnapshot() {
         AutodetectResult result = mock(AutodetectResult.class);
         ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)