Browse Source

Create an annotation when a model snapshot is stored (#53783)

Przemysław Witek 5 years ago
parent
commit
7c5c9122a7
19 changed files with 422 additions and 80 deletions
  1. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/Annotation.java
  2. 6 9
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java
  3. 67 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersister.java
  4. 1 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java
  5. 4 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java
  6. 119 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersisterTests.java
  7. 4 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationTests.java
  8. 5 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  9. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java
  10. 32 52
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java
  11. 8 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java
  12. 3 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java
  13. 7 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java
  14. 33 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java
  15. 5 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java
  16. 3 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java
  17. 88 3
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java
  18. 4 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java
  19. 31 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/Annotation.java

@@ -12,8 +12,8 @@ import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.ObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.common.time.TimeUtils;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
 
 import java.io.IOException;
 import java.util.Date;

+ 6 - 9
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java

@@ -31,17 +31,14 @@ public class AnnotationIndex {
     public static final String WRITE_ALIAS_NAME = ".ml-annotations-write";
     // Exposed for testing, but always use the aliases in non-test code
     public static final String INDEX_NAME = ".ml-annotations-6";
-    public static final String INDEX_PATTERN = ".ml-annotations*";
 
     private static final String MAPPINGS_VERSION_VARIABLE = "xpack.ml.version";
 
     /**
-     * Create the .ml-annotations index with correct mappings if it does not already
-     * exist. This index is read and written by the UI results views, so needs to
-     * exist when there might be ML results to view.
+     * Create the .ml-annotations-6 index with correct mappings if it does not already exist. This index is read and written by the UI
+     * results views, so needs to exist when there might be ML results to view.
      */
-    public static void createAnnotationsIndexIfNecessary(Settings settings, Client client, ClusterState state,
-                                                         final ActionListener<Boolean> finalListener) {
+    public static void createAnnotationsIndexIfNecessary(Client client, ClusterState state, final ActionListener<Boolean> finalListener) {
 
         final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success -> {
             final IndicesAliasesRequest request =
@@ -97,8 +94,8 @@ public class AnnotationIndex {
         finalListener.onResponse(false);
     }
 
-    public static String annotationsMapping() {
-        return TemplateUtils.loadTemplate("/org/elasticsearch/xpack/core/ml/annotations_index_mappings.json",
-            Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE);
+    private static String annotationsMapping() {
+        return TemplateUtils.loadTemplate(
+            "/org/elasticsearch/xpack/core/ml/annotations_index_mappings.json", Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE);
     }
 }

+ 67 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersister.java

@@ -0,0 +1,67 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.ml.annotations;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
+
+/**
+ * Persists annotations to Elasticsearch index.
+ */
+public class AnnotationPersister {
+
+    private static final Logger logger = LogManager.getLogger(AnnotationPersister.class);
+
+    private final Client client;
+    private final AbstractAuditor auditor;
+
+    public AnnotationPersister(Client client, AbstractAuditor auditor) {
+        this.client = Objects.requireNonNull(client);
+        this.auditor = Objects.requireNonNull(auditor);
+    }
+
+    /**
+     * Persists the given annotation to annotations index.
+     *
+     * @param annotationId existing annotation id. If {@code null}, a new annotation will be created and id will be assigned automatically
+     * @param annotation annotation to be persisted
+     * @param errorMessage error message to report when annotation fails to be persisted
+     * @return tuple of the form (annotation id, annotation object)
+     */
+    public Tuple<String, Annotation> persistAnnotation(@Nullable String annotationId, Annotation annotation, String errorMessage) {
+        Objects.requireNonNull(annotation);
+        try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
+            IndexRequest indexRequest =
+                new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME)
+                    .id(annotationId)
+                    .source(xContentBuilder);
+            try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
+                IndexResponse response = client.index(indexRequest).actionGet();
+                return Tuple.tuple(response.getId(), annotation);
+            }
+        } catch (IOException ex) {
+            String jobId = annotation.getJobId();
+            logger.error(errorMessage, ex);
+            auditor.error(jobId, errorMessage);
+            return null;
+        }
+    }
+}

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -130,6 +130,7 @@ public final class Messages {
     public static final String JOB_AUDIT_DELETED = "Job deleted";
     public static final String JOB_AUDIT_KILLING = "Killing job";
     public static final String JOB_AUDIT_OLD_RESULTS_DELETED = "Deleted results prior to {0}";
+    public static final String JOB_AUDIT_SNAPSHOT_STORED = "Job model snapshot with id [{0}] stored";
     public static final String JOB_AUDIT_REVERTED = "Job model snapshot reverted to ''{0}''";
     public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted";
     public static final String JOB_AUDIT_FILTER_UPDATED_ON_PROCESS = "Updated filter [{0}] in running process";

+ 4 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java

@@ -299,6 +299,10 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
         return jobId + "_" + TYPE + "_";
     }
 
+    public static String annotationDocumentId(ModelSnapshot snapshot) {
+        return "annotation_for_" + documentId(snapshot);
+    }
+
     public static String documentId(ModelSnapshot snapshot) {
         return documentId(snapshot.getJobId(), snapshot.getSnapshotId());
     }

+ 119 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersisterTests.java

@@ -0,0 +1,119 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.ml.annotations;
+
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
+import org.junit.Before;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+
+import java.io.IOException;
+
+import static org.elasticsearch.common.collect.Tuple.tuple;
+import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class AnnotationPersisterTests extends ESTestCase {
+
+    private static final String ANNOTATION_ID = "existing_annotation_id";
+    private static final String ERROR_MESSAGE = "an error occurred while persisting annotation";
+
+    private Client client;
+    private AbstractAuditor auditor;
+    private IndexResponse indexResponse;
+
+    private ArgumentCaptor<IndexRequest> indexRequestCaptor;
+
+    @Before
+    public void setUpMocks() {
+        ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+        ThreadPool threadPool = mock(ThreadPool.class);
+        when(threadPool.getThreadContext()).thenReturn(threadContext);
+        client = mock(Client.class);
+        when(client.threadPool()).thenReturn(threadPool);
+
+        auditor = mock(AbstractAuditor.class);
+
+        indexResponse = mock(IndexResponse.class);
+        when(indexResponse.getId()).thenReturn(ANNOTATION_ID);
+
+        indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
+    }
+
+    public void testPersistAnnotation_Create() throws IOException {
+        doReturn(instantFuture(indexResponse)).when(client).index(any());
+
+        AnnotationPersister persister = new AnnotationPersister(client, auditor);
+        Annotation annotation = AnnotationTests.randomAnnotation();
+        Tuple<String, Annotation> result = persister.persistAnnotation(null, annotation, ERROR_MESSAGE);
+        assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation))));
+
+        InOrder inOrder = inOrder(client);
+        inOrder.verify(client).threadPool();
+        inOrder.verify(client).index(indexRequestCaptor.capture());
+        verifyNoMoreInteractions(client, auditor);
+
+        IndexRequest indexRequest = indexRequestCaptor.getValue();
+
+        assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME)));
+        assertThat(indexRequest.id(), is(nullValue()));
+        assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation)));
+        assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX));
+    }
+
+    public void testPersistAnnotation_Update() throws IOException {
+        doReturn(instantFuture(indexResponse)).when(client).index(any());
+
+        AnnotationPersister persister = new AnnotationPersister(client, auditor);
+        Annotation annotation = AnnotationTests.randomAnnotation();
+        Tuple<String, Annotation> result = persister.persistAnnotation(ANNOTATION_ID, annotation, ERROR_MESSAGE);
+        assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation))));
+
+        InOrder inOrder = inOrder(client);
+        inOrder.verify(client).threadPool();
+        inOrder.verify(client).index(indexRequestCaptor.capture());
+        verifyNoMoreInteractions(client, auditor);
+
+        IndexRequest indexRequest = indexRequestCaptor.getValue();
+        assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME)));
+        assertThat(indexRequest.id(), is(equalTo(ANNOTATION_ID)));
+        assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation)));
+        assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> ActionFuture<T> instantFuture(T response) {
+        ActionFuture future = mock(ActionFuture.class);
+        when(future.actionGet()).thenReturn(response);
+        return future;
+    }
+
+    private Annotation parseAnnotation(BytesReference source) throws IOException {
+        try (XContentParser parser = createParser(jsonXContent, source)) {
+            return Annotation.PARSER.parse(parser, null);
+        }
+    }
+}

+ 4 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationTests.java

@@ -22,6 +22,10 @@ public class AnnotationTests extends AbstractSerializingTestCase<Annotation> {
 
     @Override
     protected Annotation createTestInstance() {
+        return randomAnnotation();
+    }
+
+    static Annotation randomAnnotation() {
         return new Annotation(randomAlphaOfLengthBetween(100, 1000),
             new Date(randomNonNegativeLong()),
             randomAlphaOfLengthBetween(5, 20),

+ 5 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -132,6 +132,7 @@ import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
 import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
 import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
 import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
@@ -534,6 +535,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
         new MlIndexTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry);
 
         AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
+        AnnotationPersister anomalyDetectionAnnotationPersister = new AnnotationPersister(client, anomalyDetectionAuditor);
         DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
         InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService.getNodeName());
         this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
@@ -613,13 +615,15 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
                 threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
         AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(environment, settings, client, threadPool,
                 xContentRegistry, anomalyDetectionAuditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister,
-                jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, nativeStorageProvider, indexNameExpressionResolver);
+                jobDataCountsPersister, anomalyDetectionAnnotationPersister, autodetectProcessFactory, normalizerFactory,
+                nativeStorageProvider, indexNameExpressionResolver);
         this.autodetectProcessManager.set(autodetectProcessManager);
         DatafeedJobBuilder datafeedJobBuilder =
             new DatafeedJobBuilder(
                 client,
                 xContentRegistry,
                 anomalyDetectionAuditor,
+                anomalyDetectionAnnotationPersister,
                 System::currentTimeMillis,
                 jobConfigProvider,
                 jobResultsProvider,

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java

@@ -66,7 +66,7 @@ class MlInitializationService implements LocalNodeMasterListener, ClusterStateLi
         // The atomic flag prevents multiple simultaneous attempts to create the
         // index if there is a flurry of cluster state updates in quick succession
         if (event.localNodeMaster() && isIndexCreationInProgress.compareAndSet(false, true)) {
-            AnnotationIndex.createAnnotationsIndexIfNecessary(settings, client, event.state(), ActionListener.wrap(
+            AnnotationIndex.createAnnotationsIndexIfNecessary(client, event.state(), ActionListener.wrap(
                 r -> {
                     isIndexCreationInProgress.set(false);
                     if (r) {

+ 32 - 52
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

@@ -8,16 +8,12 @@ package org.elasticsearch.xpack.ml.datafeed;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchStatusException;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.common.xcontent.ToXContent;
-import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
-import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.rest.RestStatus;
@@ -25,7 +21,7 @@ import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
 import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
 import org.elasticsearch.xpack.core.ml.action.PostDataAction;
 import org.elasticsearch.xpack.core.ml.annotations.Annotation;
-import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
@@ -55,6 +51,7 @@ class DatafeedJob {
     static final long MISSING_DATA_CHECK_INTERVAL_MS = 900_000; //15 minutes in ms
 
     private final AnomalyDetectionAuditor auditor;
+    private final AnnotationPersister annotationPersister;
     private final String jobId;
     private final DataDescription dataDescription;
     private final long frequencyMs;
@@ -69,8 +66,7 @@ class DatafeedJob {
     private volatile long lookbackStartTimeMs;
     private volatile long latestFinalBucketEndTimeMs;
     private volatile long lastDataCheckTimeMs;
-    private volatile String lastDataCheckAnnotationId;
-    private volatile Annotation lastDataCheckAnnotation;
+    private volatile Tuple<String, Annotation> lastDataCheckAnnotationWithId;
     private volatile Long lastEndTimeMs;
     private AtomicBoolean running = new AtomicBoolean(true);
     private volatile boolean isIsolated;
@@ -78,8 +74,9 @@ class DatafeedJob {
 
     DatafeedJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs,
                 DataExtractorFactory dataExtractorFactory, DatafeedTimingStatsReporter timingStatsReporter, Client client,
-                AnomalyDetectionAuditor auditor, Supplier<Long> currentTimeSupplier, DelayedDataDetector delayedDataDetector,
-                Integer maxEmptySearches, long latestFinalBucketEndTimeMs, long latestRecordTimeMs, boolean haveSeenDataPreviously) {
+                AnomalyDetectionAuditor auditor, AnnotationPersister annotationPersister, Supplier<Long> currentTimeSupplier,
+                DelayedDataDetector delayedDataDetector, Integer maxEmptySearches, long latestFinalBucketEndTimeMs, long latestRecordTimeMs,
+                boolean haveSeenDataPreviously) {
         this.jobId = jobId;
         this.dataDescription = Objects.requireNonNull(dataDescription);
         this.frequencyMs = frequencyMs;
@@ -88,6 +85,7 @@ class DatafeedJob {
         this.timingStatsReporter = timingStatsReporter;
         this.client = client;
         this.auditor = auditor;
+        this.annotationPersister = annotationPersister;
         this.currentTimeSupplier = currentTimeSupplier;
         this.delayedDataDetector = delayedDataDetector;
         this.maxEmptySearches = maxEmptySearches;
@@ -209,14 +207,14 @@ class DatafeedJob {
                 String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
                     XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(lastBucket.getTimestamp().getTime()));
 
-                Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(), endTime, msg);
+                Annotation annotation = createDelayedDataAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(), endTime, msg);
 
                 // Have we an annotation that covers the same area with the same message?
                 // Cannot use annotation.equals(other) as that checks createTime
-                if (lastDataCheckAnnotation != null
-                    && annotation.getAnnotation().equals(lastDataCheckAnnotation.getAnnotation())
-                    && annotation.getTimestamp().equals(lastDataCheckAnnotation.getTimestamp())
-                    && annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getEndTimestamp())) {
+                if (lastDataCheckAnnotationWithId != null
+                    && annotation.getAnnotation().equals(lastDataCheckAnnotationWithId.v2().getAnnotation())
+                    && annotation.getTimestamp().equals(lastDataCheckAnnotationWithId.v2().getTimestamp())
+                    && annotation.getEndTimestamp().equals(lastDataCheckAnnotationWithId.v2().getEndTimestamp())) {
                     return;
                 }
 
@@ -224,18 +222,29 @@ class DatafeedJob {
                 // in the job list page.
                 auditor.warning(jobId, msg);
 
-                if (lastDataCheckAnnotationId != null) {
-                    updateAnnotation(annotation);
+                if (lastDataCheckAnnotationWithId == null) {
+                    lastDataCheckAnnotationWithId =
+                        annotationPersister.persistAnnotation(
+                            null,
+                            annotation,
+                            "[" + jobId + "] failed to create annotation for delayed data checker.");
                 } else {
-                    lastDataCheckAnnotationId = addAndSetDelayedDataAnnotation(annotation);
+                    String annotationId = lastDataCheckAnnotationWithId.v1();
+                    Annotation updatedAnnotation = updateAnnotation(annotation);
+                    lastDataCheckAnnotationWithId =
+                        annotationPersister.persistAnnotation(
+                            annotationId,
+                            updatedAnnotation,
+                            "[" + jobId + "] failed to update annotation for delayed data checker.");
                 }
             }
         }
     }
 
-    private Annotation createAnnotation(Date startTime, Date endTime, String msg) {
+    private Annotation createDelayedDataAnnotation(Date startTime, Date endTime, String msg) {
        Date currentTime = new Date(currentTimeSupplier.get());
-       return new Annotation(msg,
+       return new Annotation(
+           msg,
            currentTime,
            XPackUser.NAME,
            startTime,
@@ -246,43 +255,14 @@ class DatafeedJob {
            "annotation");
     }
 
-    private String addAndSetDelayedDataAnnotation(Annotation annotation) {
-        try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
-            IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
-            request.source(xContentBuilder);
-            try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
-                IndexResponse response = client.index(request).actionGet();
-                lastDataCheckAnnotation = annotation;
-                return response.getId();
-            }
-        } catch (IOException ex) {
-            String errorMessage = "[" + jobId + "] failed to create annotation for delayed data checker.";
-            LOGGER.error(errorMessage, ex);
-            auditor.error(jobId, errorMessage);
-            return null;
-        }
-    }
-
-    private void updateAnnotation(Annotation annotation) {
-        Annotation updatedAnnotation = new Annotation(lastDataCheckAnnotation);
+    private Annotation updateAnnotation(Annotation annotation) {
+        Annotation updatedAnnotation = new Annotation(lastDataCheckAnnotationWithId.v2());
         updatedAnnotation.setModifiedUsername(XPackUser.NAME);
         updatedAnnotation.setModifiedTime(new Date(currentTimeSupplier.get()));
         updatedAnnotation.setAnnotation(annotation.getAnnotation());
         updatedAnnotation.setTimestamp(annotation.getTimestamp());
         updatedAnnotation.setEndTimestamp(annotation.getEndTimestamp());
-        try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
-            IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
-            indexRequest.id(lastDataCheckAnnotationId);
-            indexRequest.source(xContentBuilder);
-            try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
-                client.index(indexRequest).actionGet();
-                lastDataCheckAnnotation = updatedAnnotation;
-            }
-        } catch (IOException ex) {
-            String errorMessage = "[" + jobId + "] failed to update annotation for delayed data checker.";
-            LOGGER.error(errorMessage, ex);
-            auditor.error(jobId, errorMessage);
-        }
+        return updatedAnnotation;
     }
 
     /**

+ 8 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java

@@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.license.RemoteClusterLicenseChecker;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.xpack.core.action.util.QueryPage;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
@@ -46,6 +47,7 @@ public class DatafeedJobBuilder {
     private final Client client;
     private final NamedXContentRegistry xContentRegistry;
     private final AnomalyDetectionAuditor auditor;
+    private final AnnotationPersister annotationPersister;
     private final Supplier<Long> currentTimeSupplier;
     private final JobConfigProvider jobConfigProvider;
     private final JobResultsProvider jobResultsProvider;
@@ -55,12 +57,14 @@ public class DatafeedJobBuilder {
     private final String nodeName;
 
     public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor,
-                              Supplier<Long> currentTimeSupplier, JobConfigProvider jobConfigProvider,
-                              JobResultsProvider jobResultsProvider, DatafeedConfigProvider datafeedConfigProvider,
-                              JobResultsPersister jobResultsPersister, Settings settings, String nodeName) {
+                              AnnotationPersister annotationPersister, Supplier<Long> currentTimeSupplier,
+                              JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider,
+                              DatafeedConfigProvider datafeedConfigProvider, JobResultsPersister jobResultsPersister, Settings settings,
+                              String nodeName) {
         this.client = client;
         this.xContentRegistry = Objects.requireNonNull(xContentRegistry);
         this.auditor = Objects.requireNonNull(auditor);
+        this.annotationPersister = Objects.requireNonNull(annotationPersister);
         this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
         this.jobConfigProvider = Objects.requireNonNull(jobConfigProvider);
         this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
@@ -90,6 +94,7 @@ public class DatafeedJobBuilder {
                     context.timingStatsReporter,
                     client,
                     auditor,
+                    annotationPersister,
                     currentTimeSupplier,
                     delayedDataDetector,
                     datafeedConfigHolder.get().getMaxEmptySearches(),

+ 3 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

@@ -21,6 +21,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.BulkByScrollTask;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
@@ -69,9 +70,11 @@ public class JobDataDeleter {
         List<String> idsToDelete = new ArrayList<>();
         Set<String> indices = new HashSet<>();
         indices.add(stateIndexName);
+        indices.add(AnnotationIndex.READ_ALIAS_NAME);
         for (ModelSnapshot modelSnapshot : modelSnapshots) {
             idsToDelete.addAll(modelSnapshot.stateDocumentIds());
             idsToDelete.add(ModelSnapshot.documentId(modelSnapshot));
+            idsToDelete.add(ModelSnapshot.annotationDocumentId(modelSnapshot));
             indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
         }
 

+ 7 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

@@ -35,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.action.util.QueryPage;
 import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
 import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
@@ -103,6 +104,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
 
     private final JobResultsPersister jobResultsPersister;
     private final JobDataCountsPersister jobDataCountsPersister;
+    private final AnnotationPersister annotationPersister;
 
     private NativeStorageProvider nativeStorageProvider;
     private final ConcurrentMap<Long, ProcessContext> processByAllocation = new ConcurrentHashMap<>();
@@ -118,9 +120,9 @@ public class AutodetectProcessManager implements ClusterStateListener {
     public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool,
                                     NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor, ClusterService clusterService,
                                     JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister,
-                                    JobDataCountsPersister jobDataCountsPersister, AutodetectProcessFactory autodetectProcessFactory,
-                                    NormalizerFactory normalizerFactory, NativeStorageProvider nativeStorageProvider,
-                                    IndexNameExpressionResolver expressionResolver) {
+                                    JobDataCountsPersister jobDataCountsPersister, AnnotationPersister annotationPersister,
+                                    AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
+                                    NativeStorageProvider nativeStorageProvider, IndexNameExpressionResolver expressionResolver) {
         this.environment = environment;
         this.client = client;
         this.threadPool = threadPool;
@@ -133,6 +135,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
         this.jobResultsProvider = jobResultsProvider;
         this.jobResultsPersister = jobResultsPersister;
         this.jobDataCountsPersister = jobDataCountsPersister;
+        this.annotationPersister = annotationPersister;
         this.auditor = auditor;
         this.nativeStorageProvider = Objects.requireNonNull(nativeStorageProvider);
         clusterService.addListener(this);
@@ -511,6 +514,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
                 jobId,
                 renormalizer,
                 jobResultsPersister,
+                annotationPersister,
                 process,
                 autodetectParams.modelSizeStats(),
                 autodetectParams.timingStats());

+ 33 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

@@ -20,6 +20,8 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.xpack.core.ml.MachineLearningField;
 import org.elasticsearch.xpack.core.ml.action.PutJobAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
+import org.elasticsearch.xpack.core.ml.annotations.Annotation;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
 import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
@@ -34,6 +36,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Forecast;
 import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
 import org.elasticsearch.xpack.core.ml.job.results.Influencer;
 import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
+import org.elasticsearch.xpack.core.security.user.XPackUser;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.persistence.TimingStatsReporter;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
@@ -41,7 +44,9 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
 import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
 import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
 
+import java.time.Clock;
 import java.time.Duration;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
@@ -79,8 +84,10 @@ public class AutodetectResultProcessor {
     private final String jobId;
     private final Renormalizer renormalizer;
     private final JobResultsPersister persister;
+    private final AnnotationPersister annotationPersister;
     private final AutodetectProcess process;
     private final TimingStatsReporter timingStatsReporter;
+    private final Clock clock;
 
     final CountDownLatch completionLatch = new CountDownLatch(1);
     final Semaphore updateModelSnapshotSemaphore = new Semaphore(1);
@@ -102,26 +109,30 @@ public class AutodetectResultProcessor {
                                      String jobId,
                                      Renormalizer renormalizer,
                                      JobResultsPersister persister,
+                                     AnnotationPersister annotationPersister,
                                      AutodetectProcess process,
                                      ModelSizeStats latestModelSizeStats,
                                      TimingStats timingStats) {
-        this(client, auditor, jobId, renormalizer, persister, process, latestModelSizeStats, timingStats, new FlushListener());
+        this(client, auditor, jobId, renormalizer, persister, annotationPersister, process, latestModelSizeStats, timingStats,
+            Clock.systemUTC(), new FlushListener());
     }
 
     // Visible for testing
     AutodetectResultProcessor(Client client, AnomalyDetectionAuditor auditor, String jobId, Renormalizer renormalizer,
-                              JobResultsPersister persister, AutodetectProcess autodetectProcess, ModelSizeStats latestModelSizeStats,
-                              TimingStats timingStats, FlushListener flushListener) {
+                              JobResultsPersister persister, AnnotationPersister annotationPersister, AutodetectProcess autodetectProcess,
+                              ModelSizeStats latestModelSizeStats, TimingStats timingStats, Clock clock, FlushListener flushListener) {
         this.client = Objects.requireNonNull(client);
         this.auditor = Objects.requireNonNull(auditor);
         this.jobId = Objects.requireNonNull(jobId);
         this.renormalizer = Objects.requireNonNull(renormalizer);
         this.persister = Objects.requireNonNull(persister);
+        this.annotationPersister = Objects.requireNonNull(annotationPersister);
         this.process = Objects.requireNonNull(autodetectProcess);
         this.flushListener = Objects.requireNonNull(flushListener);
         this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
         this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId, this::isAlive);
         this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister);
+        this.clock = Objects.requireNonNull(clock);
         this.deleteInterimRequired = true;
         this.priorRunsBucketCount = timingStats.getBucketCount();
     }
@@ -268,6 +279,10 @@ public class AutodetectResultProcessor {
             if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                 updateModelSnapshotOnJob(modelSnapshot);
             }
+            annotationPersister.persistAnnotation(
+                ModelSnapshot.annotationDocumentId(modelSnapshot),
+                createModelSnapshotAnnotation(modelSnapshot),
+                "[" + jobId + "] failed to create annotation for model snapshot.");
         }
         Quantiles quantiles = result.getQuantiles();
         if (quantiles != null) {
@@ -310,6 +325,21 @@ public class AutodetectResultProcessor {
         }
     }
 
+    private Annotation createModelSnapshotAnnotation(ModelSnapshot modelSnapshot) {
+        assert modelSnapshot != null;
+        Date currentTime = new Date(clock.millis());
+        return new Annotation(
+            Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_STORED, modelSnapshot.getSnapshotId()),
+            currentTime,
+            XPackUser.NAME,
+            modelSnapshot.getTimestamp(),
+            modelSnapshot.getTimestamp(),
+            jobId,
+            currentTime,
+            XPackUser.NAME,
+            "annotation");
+    }
+
     private void processModelSizeStats(ModelSizeStats modelSizeStats) {
         LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
                 jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),

+ 5 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java

@@ -15,6 +15,7 @@ import org.elasticsearch.node.Node;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.action.util.QueryPage;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
@@ -47,6 +48,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
 
     private Client client;
     private AnomalyDetectionAuditor auditor;
+    private AnnotationPersister annotationPersister;
     private Consumer<Exception> taskHandler;
     private JobResultsProvider jobResultsProvider;
     private JobConfigProvider jobConfigProvider;
@@ -64,6 +66,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
         when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
         when(client.settings()).thenReturn(Settings.EMPTY);
         auditor = mock(AnomalyDetectionAuditor.class);
+        annotationPersister = mock(AnnotationPersister.class);
         taskHandler = mock(Consumer.class);
         jobResultsPersister = mock(JobResultsPersister.class);
 
@@ -90,6 +93,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
                 client,
                 xContentRegistry(),
                 auditor,
+                annotationPersister,
                 System::currentTimeMillis,
                 jobConfigProvider,
                 jobResultsProvider,
@@ -213,6 +217,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
                 client,
                 xContentRegistry(),
                 auditor,
+                annotationPersister,
                 System::currentTimeMillis,
                 jobConfigProvider,
                 jobResultsProvider,

+ 3 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java

@@ -27,6 +27,7 @@ import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
 import org.elasticsearch.xpack.core.ml.action.PostDataAction;
 import org.elasticsearch.xpack.core.ml.annotations.Annotation;
 import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
@@ -458,7 +459,7 @@ public class DatafeedJobTests extends ESTestCase {
                                           long latestRecordTimeMs, boolean haveSeenDataPreviously) {
         Supplier<Long> currentTimeSupplier = () -> currentTime;
         return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, timingStatsReporter,
-            client, auditor, currentTimeSupplier, delayedDataDetector, null, latestFinalBucketEndTimeMs, latestRecordTimeMs,
-            haveSeenDataPreviously);
+            client, auditor, new AnnotationPersister(client, auditor), currentTimeSupplier, delayedDataDetector, null,
+            latestFinalBucketEndTimeMs, latestRecordTimeMs, haveSeenDataPreviously);
     }
 }

+ 88 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java

@@ -5,6 +5,10 @@
  */
 package org.elasticsearch.xpack.ml.integration;
 
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -13,18 +17,25 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.service.ClusterApplierService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.MasterService;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.index.reindex.ReindexPlugin;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.action.util.QueryPage;
 import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
+import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.action.PutJobAction;
-import org.elasticsearch.xpack.core.action.util.QueryPage;
+import org.elasticsearch.xpack.core.ml.annotations.Annotation;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Detector;
@@ -61,6 +72,8 @@ import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
 import org.junit.After;
 import org.junit.Before;
 
+import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -75,8 +88,13 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
 import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -129,6 +147,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
                 JOB_ID,
                 renormalizer,
                 new JobResultsPersister(originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node")),
+                new AnnotationPersister(originSettingClient, auditor),
                 process,
                 new ModelSizeStats.Builder(JOB_ID).build(),
                 new TimingStats(JOB_ID)) {
@@ -142,10 +161,12 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
     }
 
     @After
-    public void deleteJob() {
+    public void deleteJob() throws Exception {
         DeleteJobAction.Request request = new DeleteJobAction.Request(JOB_ID);
         AcknowledgedResponse response = client().execute(DeleteJobAction.INSTANCE, request).actionGet();
         assertTrue(response.isAcknowledged());
+        // Verify that deleting job also deletes associated model snapshots annotations
+        assertThat(getAnnotations(), empty());
     }
 
     public void testProcessResults() throws Exception {
@@ -201,11 +222,44 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         assertEquals(modelSnapshot, persistedModelSnapshot.results().get(0));
         assertEquals(Collections.singletonList(modelSnapshot), capturedUpdateModelSnapshotOnJobRequests);
 
+        // Verify that creating model snapshot also creates associated annotation
+        List<Annotation> annotations = getAnnotations();
+        assertThat(annotations, hasSize(1));
+        assertThat(
+            annotations.get(0).getAnnotation(),
+            is(equalTo(
+                new ParameterizedMessage("Job model snapshot with id [{}] stored", modelSnapshot.getSnapshotId()).getFormattedMessage())));
+
         Optional<Quantiles> persistedQuantiles = getQuantiles();
         assertTrue(persistedQuantiles.isPresent());
         assertEquals(quantiles, persistedQuantiles.get());
     }
 
+    public void testProcessResults_ModelSnapshot() throws Exception {
+        ModelSnapshot modelSnapshot = createModelSnapshot();
+        ResultsBuilder resultsBuilder = new ResultsBuilder().addModelSnapshot(modelSnapshot);
+        when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
+
+        resultProcessor.process();
+        resultProcessor.awaitCompletion();
+
+        QueryPage<ModelSnapshot> persistedModelSnapshot = getModelSnapshots();
+        assertThat(persistedModelSnapshot.count(), is(equalTo(1L)));
+        assertThat(persistedModelSnapshot.results(), contains(modelSnapshot));
+
+        // Verify that creating model snapshot also creates associated annotation
+        List<Annotation> annotations = getAnnotations();
+        assertThat(annotations, hasSize(1));
+        assertThat(
+            annotations.get(0).getAnnotation(),
+            is(equalTo(
+                new ParameterizedMessage("Job model snapshot with id [{}] stored", modelSnapshot.getSnapshotId()).getFormattedMessage())));
+
+        // Verify that deleting model snapshot also deletes associated annotation
+        deleteModelSnapshot(JOB_ID, modelSnapshot.getSnapshotId());
+        assertThat(getAnnotations(), empty());
+    }
+
     public void testProcessResults_TimingStats() throws Exception {
         ResultsBuilder resultsBuilder = new ResultsBuilder()
                 .addBucket(createBucket(true, 100))
@@ -424,7 +478,10 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
     }
 
     private static ModelSnapshot createModelSnapshot() {
-        return new ModelSnapshot.Builder(JOB_ID).setSnapshotId(randomAlphaOfLength(12)).build();
+        return new ModelSnapshot.Builder(JOB_ID)
+            .setSnapshotId(randomAlphaOfLength(12))
+            .setTimestamp(Date.from(Instant.ofEpochMilli(1000000000)))
+            .build();
     }
 
     private static Quantiles createQuantiles() {
@@ -606,6 +663,34 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         return resultHolder.get();
     }
 
+    private List<Annotation> getAnnotations() throws Exception {
+        // Refresh the annotations index so that recently indexed annotation docs are visible.
+        client().admin().indices().prepareRefresh(AnnotationIndex.INDEX_NAME)
+            .setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED)
+            .execute()
+            .actionGet();
+
+        SearchRequest searchRequest = new SearchRequest(AnnotationIndex.READ_ALIAS_NAME);
+        SearchResponse searchResponse = client().search(searchRequest).actionGet();
+        List<Annotation> annotations = new ArrayList<>();
+        for (SearchHit hit : searchResponse.getHits().getHits()) {
+            annotations.add(parseAnnotation(hit.getSourceRef()));
+        }
+        return annotations;
+    }
+
+    private Annotation parseAnnotation(BytesReference source) throws IOException {
+        try (XContentParser parser = createParser(jsonXContent, source)) {
+            return Annotation.PARSER.parse(parser, null);
+        }
+    }
+
+    private void deleteModelSnapshot(String jobId, String snapshotId) {
+        DeleteModelSnapshotAction.Request request = new DeleteModelSnapshotAction.Request(jobId, snapshotId);
+        AcknowledgedResponse response = client().execute(DeleteModelSnapshotAction.INSTANCE, request).actionGet();
+        assertThat(response.isAcknowledged(), is(true));
+    }
+
     private Optional<Quantiles> getQuantiles() throws Exception {
         AtomicReference<Exception> errorHolder = new AtomicReference<>();
         AtomicReference<Optional<Quantiles>> resultHolder = new AtomicReference<>();

+ 4 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.env.TestEnvironment;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
@@ -123,6 +124,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
     private JobResultsProvider jobResultsProvider;
     private JobResultsPersister jobResultsPersister;
     private JobDataCountsPersister jobDataCountsPersister;
+    private AnnotationPersister annotationPersister;
     private AutodetectCommunicator autodetectCommunicator;
     private AutodetectProcessFactory autodetectFactory;
     private NormalizerFactory normalizerFactory;
@@ -153,6 +155,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         jobResultsPersister = mock(JobResultsPersister.class);
         when(jobResultsPersister.bulkPersisterBuilder(any(), any())).thenReturn(mock(JobResultsPersister.Builder.class));
         jobDataCountsPersister = mock(JobDataCountsPersister.class);
+        annotationPersister = mock(AnnotationPersister.class);
         autodetectCommunicator = mock(AutodetectCommunicator.class);
         autodetectFactory = mock(AutodetectProcessFactory.class);
         normalizerFactory = mock(NormalizerFactory.class);
@@ -706,7 +709,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
     private AutodetectProcessManager createManager(Settings settings) {
         return new AutodetectProcessManager(environment, settings,
             client, threadPool, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService, jobManager, jobResultsProvider,
-            jobResultsPersister, jobDataCountsPersister, autodetectFactory, normalizerFactory, nativeStorageProvider,
+            jobResultsPersister, jobDataCountsPersister, annotationPersister, autodetectFactory, normalizerFactory, nativeStorageProvider,
             new IndexNameExpressionResolver());
     }
     private AutodetectProcessManager createSpyManagerAndCallProcessData(String jobId) {

+ 31 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java

@@ -23,6 +23,8 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
+import org.elasticsearch.xpack.core.ml.annotations.Annotation;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
 import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
@@ -35,6 +37,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
 import org.elasticsearch.xpack.core.ml.job.results.Influencer;
 import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
+import org.elasticsearch.xpack.core.security.user.XPackUser;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
 import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
@@ -42,9 +45,13 @@ import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
 import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
 import org.junit.After;
 import org.junit.Before;
+import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 
+import java.time.Clock;
 import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.Date;
@@ -72,6 +79,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
     private static final String JOB_ID = "valid_id";
     private static final long BUCKET_SPAN_MS = 1000;
+    private static final Instant CURRENT_TIME = Instant.ofEpochMilli(2000000000);
 
     private ThreadPool threadPool;
     private Client client;
@@ -79,6 +87,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
     private Renormalizer renormalizer;
     private JobResultsPersister persister;
     private JobResultsPersister.Builder bulkBuilder;
+    private AnnotationPersister annotationPersister;
     private AutodetectProcess process;
     private FlushListener flushListener;
     private AutodetectResultProcessor processorUnderTest;
@@ -95,6 +104,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         renormalizer = mock(Renormalizer.class);
         persister = mock(JobResultsPersister.class);
         bulkBuilder = mock(JobResultsPersister.Builder.class);
+        annotationPersister = mock(AnnotationPersister.class);
         when(persister.bulkPersisterBuilder(eq(JOB_ID), any())).thenReturn(bulkBuilder);
         process = mock(AutodetectProcess.class);
         flushListener = mock(FlushListener.class);
@@ -104,15 +114,17 @@ public class AutodetectResultProcessorTests extends ESTestCase {
             JOB_ID,
             renormalizer,
             persister,
+            annotationPersister,
             process,
             new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(),
             new TimingStats(JOB_ID),
+            Clock.fixed(CURRENT_TIME, ZoneId.systemDefault()),
             flushListener);
     }
 
     @After
     public void cleanup() {
-        verifyNoMoreInteractions(auditor, renormalizer, persister);
+        verifyNoMoreInteractions(auditor, renormalizer, persister, annotationPersister);
         executor.shutdown();
     }
 
@@ -353,6 +365,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         AutodetectResult result = mock(AutodetectResult.class);
         ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)
             .setSnapshotId("a_snapshot_id")
+            .setTimestamp(Date.from(Instant.ofEpochMilli(1000000000)))
             .setMinVersion(Version.CURRENT)
             .build();
         when(result.getModelSnapshot()).thenReturn(modelSnapshot);
@@ -367,6 +380,23 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(persister).persistModelSnapshot(eq(modelSnapshot), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any());
 
+        ArgumentCaptor<Annotation> annotationCaptor = ArgumentCaptor.forClass(Annotation.class);
+        verify(annotationPersister).persistAnnotation(
+            eq(ModelSnapshot.annotationDocumentId(modelSnapshot)), annotationCaptor.capture(), any());
+        Annotation annotation = annotationCaptor.getValue();
+        Annotation expectedAnnotation =
+            new Annotation(
+                "Job model snapshot with id [a_snapshot_id] stored",
+                Date.from(CURRENT_TIME),
+                XPackUser.NAME,
+                modelSnapshot.getTimestamp(),
+                modelSnapshot.getTimestamp(),
+                JOB_ID,
+                Date.from(CURRENT_TIME),
+                XPackUser.NAME,
+                "annotation");
+        assertThat(annotation, is(equalTo(expectedAnnotation)));
+
         UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID,
                 new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build());