Browse Source

[ML] Audit a message every day the datafeed has seen no data (#91774)

David Kyle 2 years ago
parent
commit
df66f67165

+ 5 - 0
docs/changelog/91774.yaml

@@ -0,0 +1,5 @@
+pr: 91774
+summary: Audit a message every day the datafeed has seen no data
+area: Machine Learning
+type: enhancement
+issues: []

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -158,6 +158,7 @@ public final class Messages {
     public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]";
     public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time";
     public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped";
+    public static final String JOB_AUDIT_DATAFEED_STOPPED_WITH_REASON = "Datafeed stopped with reason [{0}]";
     public static final String JOB_AUDIT_DATAFEED_ISOLATED = "Datafeed isolated";
     public static final String JOB_AUDIT_DELETING = "Deleting job by task with id ''{0}''";
     public static final String JOB_AUDIT_DELETING_FAILED = "Error deleting job: {0}";

+ 4 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

@@ -137,6 +137,10 @@ class DatafeedJob {
         return maxEmptySearches;
     }
 
+    public long numberOfSearchesIn24Hours() {
+        return (60_000 * 60 * 24) / frequencyMs;
+    }
+
     public void finishReportingTimingStats() {
         timingStatsReporter.finishReporting();
     }

+ 27 - 15
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedRunner.java

@@ -96,7 +96,13 @@ public class DatafeedRunner {
     public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer<Exception> finishHandler) {
         ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(datafeedJob -> {
             String jobId = datafeedJob.getJobId();
-            Holder holder = new Holder(task, task.getDatafeedId(), datafeedJob, new ProblemTracker(auditor, jobId), finishHandler);
+            Holder holder = new Holder(
+                task,
+                task.getDatafeedId(),
+                datafeedJob,
+                new ProblemTracker(auditor, jobId, datafeedJob.numberOfSearchesIn24Hours()),
+                finishHandler
+            );
             StoppedOrIsolated stoppedOrIsolated = task.executeIfNotStoppedOrIsolated(
                 () -> runningDatafeedsOnThisNode.put(task.getAllocationId(), holder)
             );
@@ -313,15 +319,15 @@ public class DatafeedRunner {
                     } catch (DatafeedJob.EmptyDataCountException e) {
                         int emptyDataCount = holder.problemTracker.reportEmptyDataCount();
                         if (e.haveEverSeenData == false && holder.shouldStopAfterEmptyData(emptyDataCount)) {
-                            logger.warn(
-                                "Datafeed for ["
-                                    + jobId
-                                    + "] has seen no data in ["
-                                    + emptyDataCount
-                                    + "] attempts, and never seen any data previously, so stopping..."
-                            );
+                            String noDataMessage = "Datafeed for ["
+                                + jobId
+                                + "] has seen no data in ["
+                                + emptyDataCount
+                                + "] attempts, and never seen any data previously, so stopping...";
+                            logger.warn(noDataMessage);
+
                             // In this case we auto-close the job, as though a lookback-only datafeed stopped
-                            holder.stop("no_data", TimeValue.timeValueSeconds(20), e, true);
+                            holder.stop("no_data", TimeValue.timeValueSeconds(20), e, true, noDataMessage);
                             return;
                         }
                         nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
@@ -432,10 +438,10 @@ public class DatafeedRunner {
         }
 
         public void stop(String source, TimeValue timeout, Exception e) {
-            stop(source, timeout, e, defaultAutoCloseJob);
+            stop(source, timeout, e, defaultAutoCloseJob, null);
         }
 
-        public void stop(String source, TimeValue timeout, Exception e, boolean autoCloseJob) {
+        public void stop(String source, TimeValue timeout, Exception e, boolean autoCloseJob, String stoppedReason) {
             if (isNodeShuttingDown) {
                 return;
             }
@@ -466,10 +472,16 @@ public class DatafeedRunner {
                     if (cancellable != null) {
                         cancellable.cancel();
                     }
-                    auditor.info(
-                        datafeedJob.getJobId(),
-                        Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED)
-                    );
+                    String auditMessage;
+                    if (isIsolated()) {
+                        auditMessage = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_ISOLATED);
+                    } else {
+                        auditMessage = stoppedReason == null
+                            ? Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)
+                            : Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED_WITH_REASON, stoppedReason);
+                    }
+                    auditor.info(datafeedJob.getJobId(), auditMessage);
+
                     datafeedJob.finishReportingTimingStats();
                     finishHandler.accept(e);
                     logger.info(

+ 6 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java

@@ -35,10 +35,12 @@ class ProblemTracker {
     private volatile boolean hadProblems;
     private volatile String previousProblem;
     private volatile int emptyDataCount;
+    private final long numberOfSearchesInADay;
 
-    ProblemTracker(AnomalyDetectionAuditor auditor, String jobId) {
+    ProblemTracker(AnomalyDetectionAuditor auditor, String jobId, long numberOfSearchesInADay) {
         this.auditor = Objects.requireNonNull(auditor);
         this.jobId = Objects.requireNonNull(jobId);
+        this.numberOfSearchesInADay = Math.max(numberOfSearchesInADay, 1);
     }
 
     /**
@@ -74,10 +76,11 @@ class ProblemTracker {
 
     /**
      * Updates the tracking of empty data cycles. If the number of consecutive empty data
-     * cycles reaches {@code EMPTY_DATA_WARN_COUNT}, a warning is reported.
+     * cycles reaches {@code EMPTY_DATA_WARN_COUNT} or the 24 hours of empty data counts
+     * have passed a warning is reported.
      */
     public int reportEmptyDataCount() {
-        if (++emptyDataCount == EMPTY_DATA_WARN_COUNT) {
+        if (++emptyDataCount == EMPTY_DATA_WARN_COUNT || (emptyDataCount % numberOfSearchesInADay) == 0) {
             auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_NO_DATA));
         }
         return emptyDataCount;

+ 1 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedRunnerTests.java

@@ -130,6 +130,7 @@ public class DatafeedRunnerTests extends ESTestCase {
         when(datafeedJob.stop()).thenReturn(true);
         when(datafeedJob.getJobId()).thenReturn(job.getId());
         when(datafeedJob.getMaxEmptySearches()).thenReturn(null);
+        when(datafeedJob.numberOfSearchesIn24Hours()).thenReturn(24L);
         datafeedJobBuilder = mock(DatafeedJobBuilder.class);
         doAnswer(invocationOnMock -> {
             ActionListener<DatafeedJob> listener = (ActionListener<DatafeedJob>) invocationOnMock.getArguments()[2];

+ 23 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java

@@ -25,10 +25,12 @@ public class ProblemTrackerTests extends ESTestCase {
 
     private ProblemTracker problemTracker;
 
+    private static final long NUM_SEARCHES_IN_DAY = 24L;
+
     @Before
     public void setUpTests() {
         auditor = mock(AnomalyDetectionAuditor.class);
-        problemTracker = new ProblemTracker(auditor, "foo");
+        problemTracker = new ProblemTracker(auditor, "foo", NUM_SEARCHES_IN_DAY);
     }
 
     public void testReportExtractionProblem() {
@@ -118,6 +120,26 @@ public class ProblemTrackerTests extends ESTestCase {
         verify(auditor).info("foo", "Datafeed has started retrieving data again");
     }
 
+    public void testUpdateEmptyDataCount_DailyTrigger() {
+        for (int i = 0; i < NUM_SEARCHES_IN_DAY; i++) {
+            problemTracker.reportEmptyDataCount();
+        }
+        verify(auditor, times(2)).warning("foo", "Datafeed has been retrieving no data for a while");
+
+        for (int i = 0; i < NUM_SEARCHES_IN_DAY; i++) {
+            problemTracker.reportEmptyDataCount();
+        }
+        verify(auditor, times(3)).warning("foo", "Datafeed has been retrieving no data for a while");
+    }
+
+    public void testUpdateEmptyDataCount_NumSearchesInDayIsZero() {
+        auditor = mock(AnomalyDetectionAuditor.class);
+        problemTracker = new ProblemTracker(auditor, "foo", 0);
+
+        problemTracker.reportEmptyDataCount();
+        verify(auditor, times(1)).warning("foo", "Datafeed has been retrieving no data for a while");
+    }
+
     public void testFinishReport_GivenNoProblems() {
         problemTracker.finishReport();