Browse Source

ML having delayed data detection create annotations (#36796)

* ML having delayed data detection create annotations

* adding upsertAsDoc, audit, and changing user

* changing update to just index the doc with the id set
Benjamin Trent 6 years ago
parent
commit
1d429cf1c9

+ 13 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/Annotation.java

@@ -77,6 +77,19 @@ public class Annotation implements ToXContentObject, Writeable {
         this.type = Objects.requireNonNull(type);
     }
 
+    public Annotation(Annotation other) {
+        Objects.requireNonNull(other);
+        this.annotation = other.annotation;
+        this.createTime = new Date(other.createTime.getTime());
+        this.createUsername = other.createUsername;
+        this.timestamp = new Date(other.timestamp.getTime());
+        this.endTimestamp = other.endTimestamp == null ? null : new Date(other.endTimestamp.getTime());
+        this.jobId = other.jobId;
+        this.modifiedTime = other.modifiedTime == null ? null : new Date(other.modifiedTime.getTime());
+        this.modifiedUsername = other.modifiedUsername;
+        this.type = other.type;
+    }
+
     public Annotation(StreamInput in) throws IOException {
         annotation = in.readString();
         createTime = new Date(in.readLong());

+ 9 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationTests.java

@@ -11,6 +11,8 @@ import org.elasticsearch.test.AbstractSerializingTestCase;
 
 import java.util.Date;
 
+import static org.hamcrest.Matchers.equalTo;
+
 public class AnnotationTests extends AbstractSerializingTestCase<Annotation> {
 
     @Override
@@ -35,4 +37,11 @@ public class AnnotationTests extends AbstractSerializingTestCase<Annotation> {
     protected Writeable.Reader<Annotation> instanceReader() {
         return Annotation::new;
     }
+
+    public void testCopyConstructor() {
+        for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
+            Annotation testAnnotation = createTestInstance();
+            assertThat(testAnnotation, equalTo(new Annotation(testAnnotation)));
+        }
+    }
 }

+ 82 - 10
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

@@ -8,11 +8,16 @@ package org.elasticsearch.xpack.ml.datafeed;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
+import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.core.internal.io.Streams;
 import org.elasticsearch.index.mapper.DateFieldMapper;
@@ -20,10 +25,13 @@ import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
 import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
 import org.elasticsearch.xpack.core.ml.action.PostDataAction;
+import org.elasticsearch.xpack.core.ml.annotations.Annotation;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
+import org.elasticsearch.xpack.core.security.user.SystemUser;
 import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
 import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
 import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@@ -61,7 +69,8 @@ class DatafeedJob {
     private volatile long lookbackStartTimeMs;
     private volatile long latestFinalBucketEndTimeMs;
     private volatile long lastDataCheckTimeMs;
-    private volatile int lastDataCheckAudit;
+    private volatile String lastDataCheckAnnotationId;
+    private volatile Annotation lastDataCheckAnnotation;
     private volatile Long lastEndTimeMs;
     private AtomicBoolean running = new AtomicBoolean(true);
     private volatile boolean isIsolated;
@@ -177,23 +186,86 @@ class DatafeedJob {
             this.lastDataCheckTimeMs = this.currentTimeSupplier.get();
             List<BucketWithMissingData> missingDataBuckets = delayedDataDetector.detectMissingData(latestFinalBucketEndTimeMs);
             if (missingDataBuckets.isEmpty() == false) {
-
                 long totalRecordsMissing = missingDataBuckets.stream()
                     .mapToLong(BucketWithMissingData::getMissingDocumentCount)
                     .sum();
-                // The response is sorted by asc timestamp, so the last entry is the last bucket
-                Date lastBucketDate = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket().getTimestamp();
-                int newAudit = Objects.hash(totalRecordsMissing, lastBucketDate);
-                if (newAudit != lastDataCheckAudit) {
-                    auditor.warning(jobId,
-                        Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
-                            XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(lastBucketDate.getTime())));
-                    lastDataCheckAudit = newAudit;
+                Date endTime = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket().getTimestamp();
+                Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(),
+                    endTime,
+                    totalRecordsMissing);
+
+                // Have we an annotation that covers the same area with the same message?
+                // Cannot use annotation.equals(other) as that checks createTime
+                if (lastDataCheckAnnotation != null
+                    && annotation.getAnnotation().equals(lastDataCheckAnnotation.getAnnotation())
+                    && annotation.getTimestamp().equals(lastDataCheckAnnotation.getTimestamp())
+                    && annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getTimestamp())) {
+                    return;
+                }
+
+                // Creating a warning in addition to updating/creating our annotation. This allows the issue to be plainly visible
+                // in the job list page.
+                auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
+                    XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime())));
+
+                if (lastDataCheckAnnotationId != null) {
+                    updateAnnotation(annotation);
+                } else {
+                    lastDataCheckAnnotationId = addAndSetDelayedDataAnnotation(annotation);
                 }
             }
         }
     }
 
+    private Annotation createAnnotation(Date startTime, Date endTime, long recordsMissing) {
+       String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, recordsMissing,
+            XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime()));
+       return new Annotation(msg,
+           new Date(currentTimeSupplier.get()),
+           SystemUser.NAME,
+           startTime,
+           endTime,
+           jobId,
+           null,
+           null,
+           "annotation");
+    }
+
+    private String addAndSetDelayedDataAnnotation(Annotation annotation) {
+        try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
+            IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
+            request.source(xContentBuilder);
+            IndexResponse response = client.index(request).actionGet();
+            lastDataCheckAnnotation = annotation;
+            return response.getId();
+        } catch (IOException ex) {
+            String errorMessage = "[" + jobId + "] failed to create annotation for delayed data checker.";
+            LOGGER.error(errorMessage, ex);
+            auditor.error(jobId, errorMessage);
+            return null;
+        }
+    }
+
+    private void updateAnnotation(Annotation annotation) {
+        Annotation updatedAnnotation = new Annotation(lastDataCheckAnnotation);
+        updatedAnnotation.setModifiedUsername(SystemUser.NAME);
+        updatedAnnotation.setModifiedTime(new Date(currentTimeSupplier.get()));
+        updatedAnnotation.setAnnotation(annotation.getAnnotation());
+        updatedAnnotation.setTimestamp(annotation.getTimestamp());
+        updatedAnnotation.setEndTimestamp(annotation.getEndTimestamp());
+        try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
+            IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
+            indexRequest.id(lastDataCheckAnnotationId);
+            indexRequest.source(xContentBuilder);
+            client.index(indexRequest).actionGet();
+            lastDataCheckAnnotation = updatedAnnotation;
+        } catch (IOException ex) {
+            String errorMessage = "[" + jobId + "] failed to update annotation for delayed data checker.";
+            LOGGER.error(errorMessage, ex);
+            auditor.error(jobId, errorMessage);
+        }
+    }
+
     /**
      * We wait a static interval of 15 minutes till the next missing data check.
      *

+ 73 - 4
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java

@@ -6,21 +6,31 @@
 package org.elasticsearch.xpack.ml.datafeed;
 
 import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
+import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.mock.orig.Mockito;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
 import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
 import org.elasticsearch.xpack.core.ml.action.PostDataAction;
+import org.elasticsearch.xpack.core.ml.annotations.Annotation;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
+import org.elasticsearch.xpack.core.security.user.SystemUser;
 import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
 import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
 import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@@ -48,6 +58,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -66,8 +77,10 @@ public class DatafeedJobTests extends ESTestCase {
     private DataDescription.Builder dataDescription;
     ActionFuture<PostDataAction.Response> postDataFuture;
     private ActionFuture<FlushJobAction.Response> flushJobFuture;
+    private ActionFuture<IndexResponse> indexFuture;
     private ArgumentCaptor<FlushJobAction.Request> flushJobRequests;
     private FlushJobAction.Response flushJobResponse;
+    private String annotationDocId;
 
     private long currentTime;
     private XContentType xContentType;
@@ -87,6 +100,8 @@ public class DatafeedJobTests extends ESTestCase {
         dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
         postDataFuture = mock(ActionFuture.class);
         flushJobFuture = mock(ActionFuture.class);
+        indexFuture = mock(ActionFuture.class);
+        annotationDocId = "AnnotationDocId";
         flushJobResponse = new FlushJobAction.Response(true, new Date());
         delayedDataDetector = mock(DelayedDataDetector.class);
         when(delayedDataDetector.getWindow()).thenReturn(DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS);
@@ -109,6 +124,9 @@ public class DatafeedJobTests extends ESTestCase {
         flushJobRequests = ArgumentCaptor.forClass(FlushJobAction.Request.class);
         when(flushJobFuture.actionGet()).thenReturn(flushJobResponse);
         when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture);
+
+        when(indexFuture.actionGet()).thenReturn(new IndexResponse(new ShardId("index", "uuid", 0), "doc", annotationDocId, 0, 0, 0, true));
+        when(client.index(any())).thenReturn(indexFuture);
     }
 
     public void testLookBackRunWithEndTime() throws Exception {
@@ -244,10 +262,61 @@ public class DatafeedJobTests extends ESTestCase {
         when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
         datafeedJob.runRealtime();
 
-        verify(auditor, times(1)).warning(jobId,
-            Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA,
-                10,
-                XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000)));
+        String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA,
+            10,
+            XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000));
+
+        Annotation expectedAnnotation = new Annotation(msg,
+            new Date(currentTime),
+            SystemUser.NAME,
+            bucket.getTimestamp(),
+            bucket.getTimestamp(),
+            jobId,
+            null,
+            null,
+            "annotation");
+
+        IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
+        try (XContentBuilder xContentBuilder = expectedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
+            request.source(xContentBuilder);
+        }
+        ArgumentCaptor<IndexRequest> indexRequestArgumentCaptor = ArgumentCaptor.forClass(IndexRequest.class);
+        verify(client, atMost(2)).index(indexRequestArgumentCaptor.capture());
+        assertThat(request.index(), equalTo(indexRequestArgumentCaptor.getValue().index()));
+        assertThat(request.source(), equalTo(indexRequestArgumentCaptor.getValue().source()));
+
+        // Execute a fourth time, this time we return a new delayedDataDetector response to verify annotation gets updated
+        when(delayedDataDetector.detectMissingData(2000))
+            .thenReturn(Collections.singletonList(BucketWithMissingData.fromMissingAndBucket(15, bucket)));
+        currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
+        inputStream = new ByteArrayInputStream(contentBytes);
+        when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
+        when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
+        when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
+        datafeedJob.runRealtime();
+
+        msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA,
+            15,
+            XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000));
+        // What we expect the updated annotation to be indexed as
+        IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
+        indexRequest.id(annotationDocId);
+        Annotation updatedAnnotation = new Annotation(expectedAnnotation);
+        updatedAnnotation.setAnnotation(msg);
+        updatedAnnotation.setModifiedTime(new Date(currentTime));
+        updatedAnnotation.setModifiedUsername(SystemUser.NAME);
+        try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
+            indexRequest.source(xContentBuilder);
+        }
+
+        ArgumentCaptor<IndexRequest> updateRequestArgumentCaptor = ArgumentCaptor.forClass(IndexRequest.class);
+
+        verify(client, atMost(2)).index(updateRequestArgumentCaptor.capture());
+        assertThat(indexRequest.index(), equalTo(updateRequestArgumentCaptor.getValue().index()));
+        assertThat(indexRequest.id(), equalTo(updateRequestArgumentCaptor.getValue().id()));
+        assertThat(indexRequest.source().utf8ToString(),
+            equalTo(updateRequestArgumentCaptor.getValue().source().utf8ToString()));
+        assertThat(updateRequestArgumentCaptor.getValue().opType(), equalTo(DocWriteRequest.OpType.INDEX));
     }
 
     public void testEmptyDataCountGivenlookback() throws Exception {