浏览代码

[ML] Add audit warning for 1000 categories found early in job (#51146)

If 1000 different category definitions are created for a job in
the first 100 buckets it processes then an audit warning will now
be created.  (This will cause a yellow warning triangle in the
ML UI's jobs list.)

Such a large number of categories suggests that the field that
categorization is working on is not well suited to the ML
categorization functionality.
David Roberts 5 年之前
父节点
当前提交
160a21230e

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -135,6 +135,8 @@ public final class Messages {
             "Adjust the analysis_limits.model_memory_limit setting to ensure all data is analyzed";
     public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2 = "Job memory status changed to hard_limit at {0}; adjust the " +
         "analysis_limits.model_memory_limit setting to ensure all data is analyzed";
+    public static final String JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES = "{0} categories observed in the first [{1}] buckets." +
+        " This suggests an inappropriate categorization_field_name has been chosen.";
 
     public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates";
     public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY =

+ 28 - 6
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

@@ -74,6 +74,9 @@ public class AutodetectResultProcessor {
 
     private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class);
 
+    static final long EARLY_BUCKET_THRESHOLD = 100;
+    static final int EXCESSIVE_EARLY_CATEGORY_COUNT = 1000;
+
     private final Client client;
     private final AnomalyDetectionAuditor auditor;
     private final String jobId;
@@ -87,7 +90,9 @@ public class AutodetectResultProcessor {
     private final FlushListener flushListener;
     private volatile boolean processKilled;
     private volatile boolean failed;
-    private int bucketCount; // only used from the process() thread, so doesn't need to be volatile
+    private long priorRunsBucketCount;
+    private long currentRunBucketCount; // only used from the process() thread, so doesn't need to be volatile
+    private boolean excessiveCategoryWarningIssued; // only used from the process() thread, so doesn't need to be volatile
     private final JobResultsPersister.Builder bulkResultsPersister;
     private boolean deleteInterimRequired;
 
@@ -122,6 +127,7 @@ public class AutodetectResultProcessor {
         this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId, this::isAlive);
         this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister);
         this.deleteInterimRequired = true;
+        this.priorRunsBucketCount = timingStats.getBucketCount();
     }
 
     public void process() {
@@ -140,7 +146,7 @@ public class AutodetectResultProcessor {
             } catch (Exception e) {
                 LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
             }
-            LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);
+            LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, currentRunBucketCount);
 
         } catch (Exception e) {
             failed = true;
@@ -166,7 +172,7 @@ public class AutodetectResultProcessor {
     }
 
     private void readResults() {
-        bucketCount = 0;
+        currentRunBucketCount = 0;
         try {
             Iterator<AutodetectResult> iterator = process.readAutodetectResults();
             while (iterator.hasNext()) {
@@ -174,7 +180,7 @@ public class AutodetectResultProcessor {
                     AutodetectResult result = iterator.next();
                     processResult(result);
                     if (result.getBucket() != null) {
-                        LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
+                        LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount);
                     }
                 } catch (Exception e) {
                     if (isAlive() == false) {
@@ -212,7 +218,7 @@ public class AutodetectResultProcessor {
             // results are also interim
             timingStatsReporter.reportBucket(bucket);
             bulkResultsPersister.persistBucket(bucket).executeRequest();
-            ++bucketCount;
+            ++currentRunBucketCount;
         }
         List<AnomalyRecord> records = result.getRecords();
         if (records != null && !records.isEmpty()) {
@@ -224,7 +230,7 @@ public class AutodetectResultProcessor {
         }
         CategoryDefinition categoryDefinition = result.getCategoryDefinition();
         if (categoryDefinition != null) {
-            persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
+            processCategoryDefinition(categoryDefinition);
         }
         ModelPlot modelPlot = result.getModelPlot();
         if (modelPlot != null) {
@@ -308,6 +314,22 @@ public class AutodetectResultProcessor {
         }
     }
 
+    private void processCategoryDefinition(CategoryDefinition categoryDefinition) {
+        persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
+        if (categoryDefinition.getCategoryId() == EXCESSIVE_EARLY_CATEGORY_COUNT &&
+            priorRunsBucketCount + currentRunBucketCount < EARLY_BUCKET_THRESHOLD &&
+            excessiveCategoryWarningIssued == false) {
+            auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES, EXCESSIVE_EARLY_CATEGORY_COUNT,
+                // Add 1 because category definitions are written before buckets
+                1L + priorRunsBucketCount + currentRunBucketCount));
+            // This flag won't be retained if the job is closed and reopened, or if the job migrates to another node.
+            // This means it's possible the audit message is generated multiple times.  However, that's not a
+            // disaster, and is also very unlikely in the the (best practice) cases where initial lookback covers
+            // more than 100 buckets.
+            excessiveCategoryWarningIssued = true;
+        }
+    }
+
     private void processModelSizeStats(ModelSizeStats modelSizeStats) {
         LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
                 jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),

+ 74 - 13
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java

@@ -133,7 +133,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(persister).commitStateWrites(JOB_ID);
     }
 
-    public void testProcessResult_bucket() throws Exception {
+    public void testProcessResult_bucket() {
         when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
         when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
         AutodetectResult result = mock(AutodetectResult.class);
@@ -150,7 +150,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(persister, never()).deleteInterimResults(JOB_ID);
     }
 
-    public void testProcessResult_bucket_deleteInterimRequired() throws Exception {
+    public void testProcessResult_bucket_deleteInterimRequired() {
         when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
         when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
         AutodetectResult result = mock(AutodetectResult.class);
@@ -167,7 +167,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(persister).deleteInterimResults(JOB_ID);
     }
 
-    public void testProcessResult_records() throws Exception {
+    public void testProcessResult_records() {
         AutodetectResult result = mock(AutodetectResult.class);
         List<AnomalyRecord> records =
             Arrays.asList(
@@ -183,7 +183,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
     }
 
-    public void testProcessResult_influencers() throws Exception {
+    public void testProcessResult_influencers() {
         AutodetectResult result = mock(AutodetectResult.class);
         List<Influencer> influencers =
             Arrays.asList(
@@ -199,9 +199,10 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
     }
 
-    public void testProcessResult_categoryDefinition() throws Exception {
+    public void testProcessResult_categoryDefinition() {
         AutodetectResult result = mock(AutodetectResult.class);
         CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
+        when(categoryDefinition.getCategoryId()).thenReturn(1L);
         when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
 
         processorUnderTest.setDeleteInterimRequired(false);
@@ -212,7 +213,66 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
     }
 
-    public void testProcessResult_flushAcknowledgement() throws Exception {
+    public void testProcessResult_excessiveCategoryDefinitionCountEarly() {
+        int iterations = 3;
+        int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;
+
+        processorUnderTest.setDeleteInterimRequired(false);
+
+        AutodetectResult result = mock(AutodetectResult.class);
+        for (int iteration = 1; iteration <= iterations; ++iteration) {
+            for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
+                CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
+                categoryDefinition.setCategoryId(categoryId);
+                when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
+
+                processorUnderTest.processResult(result);
+            }
+        }
+
+        verify(bulkBuilder, never()).executeRequest();
+        verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
+        verify(auditor).warning(eq(JOB_ID), eq(Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES,
+            AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT, 1)));
+    }
+
+    public void testProcessResult_highCategoryDefinitionCountLateOn() {
+        int iterations = 3;
+        int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;
+
+        processorUnderTest.setDeleteInterimRequired(false);
+
+        when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
+        when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
+
+        AutodetectResult bucketResult = mock(AutodetectResult.class);
+        final int numPriorBuckets = (int) AutodetectResultProcessor.EARLY_BUCKET_THRESHOLD + 1;
+        for (int i = 0; i < numPriorBuckets; ++i) {
+            Bucket bucket = new Bucket(JOB_ID, new Date(i * 1000 + 1000000), BUCKET_SPAN_MS);
+            when(bucketResult.getBucket()).thenReturn(bucket);
+            processorUnderTest.processResult(bucketResult);
+        }
+
+        AutodetectResult categoryResult = mock(AutodetectResult.class);
+        for (int iteration = 1; iteration <= iterations; ++iteration) {
+            for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
+                CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
+                categoryDefinition.setCategoryId(categoryId);
+                when(categoryResult.getCategoryDefinition()).thenReturn(categoryDefinition);
+                processorUnderTest.processResult(categoryResult);
+            }
+        }
+
+        verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
+        verify(bulkBuilder, times(numPriorBuckets)).persistBucket(any(Bucket.class));
+        verify(bulkBuilder, times(numPriorBuckets)).executeRequest();
+        verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
+        verify(auditor, never()).warning(eq(JOB_ID), anyString());
+    }
+
+    public void testProcessResult_flushAcknowledgement() {
         AutodetectResult result = mock(AutodetectResult.class);
         FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
         when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
@@ -228,12 +288,13 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(bulkBuilder).executeRequest();
     }
 
-    public void testProcessResult_flushAcknowledgementMustBeProcessedLast() throws Exception {
+    public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
         AutodetectResult result = mock(AutodetectResult.class);
         FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
         when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
         when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
         CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
+        when(categoryDefinition.getCategoryId()).thenReturn(1L);
         when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
 
         processorUnderTest.setDeleteInterimRequired(false);
@@ -248,7 +309,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
     }
 
-    public void testProcessResult_modelPlot() throws Exception {
+    public void testProcessResult_modelPlot() {
         AutodetectResult result = mock(AutodetectResult.class);
         ModelPlot modelPlot = mock(ModelPlot.class);
         when(result.getModelPlot()).thenReturn(modelPlot);
@@ -260,7 +321,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(bulkBuilder).persistModelPlot(modelPlot);
     }
 
-    public void testProcessResult_modelSizeStats() throws Exception {
+    public void testProcessResult_modelSizeStats() {
         AutodetectResult result = mock(AutodetectResult.class);
         ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
         when(result.getModelSizeStats()).thenReturn(modelSizeStats);
@@ -273,7 +334,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(persister).persistModelSizeStats(eq(modelSizeStats), any());
     }
 
-    public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() throws Exception {
+    public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
         TimeValue delay = TimeValue.timeValueSeconds(5);
         // Set up schedule delay time
         when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString()))
@@ -313,7 +374,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb"));
     }
 
-    public void testProcessResult_modelSnapshot() throws Exception {
+    public void testProcessResult_modelSnapshot() {
         AutodetectResult result = mock(AutodetectResult.class);
         ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)
             .setSnapshotId("a_snapshot_id")
@@ -337,7 +398,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());
     }
 
-    public void testProcessResult_quantiles_givenRenormalizationIsEnabled() throws Exception {
+    public void testProcessResult_quantiles_givenRenormalizationIsEnabled() {
         AutodetectResult result = mock(AutodetectResult.class);
         Quantiles quantiles = mock(Quantiles.class);
         when(result.getQuantiles()).thenReturn(quantiles);
@@ -354,7 +415,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(renormalizer).renormalize(quantiles);
     }
 
-    public void testProcessResult_quantiles_givenRenormalizationIsDisabled() throws Exception {
+    public void testProcessResult_quantiles_givenRenormalizationIsDisabled() {
         AutodetectResult result = mock(AutodetectResult.class);
         Quantiles quantiles = mock(Quantiles.class);
         when(result.getQuantiles()).thenReturn(quantiles);