Browse Source

[ML] Refresh annotations index on job flush and close (#57979)

Now that annotations are part of the anomaly detection job results
the annotations index should be refreshed on flushing and closing
the job so that flush and close continue to fulfil their contracts
that immediately after returning all results the job generated up
to that point are searchable.
David Roberts 5 years ago
parent
commit
bf698418c3

+ 16 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java

@@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
@@ -390,6 +391,21 @@ public class JobResultsPersister {
         }
         }
     }
     }
 
 
+    /**
+     * Makes annotations searchable as they are considered part of a job's results
+     * to fulfil the contract that job results are searchable immediately after a
+     * close or flush.
+     */
+    public void commitAnnotationWrites() {
+        // We refresh using the read alias in order to ensure all indices will
+        // be refreshed even if a rollover occurs in between.
+        RefreshRequest refreshRequest = new RefreshRequest(AnnotationIndex.READ_ALIAS_NAME);
+        refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
+        try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
+            client.admin().indices().refresh(refreshRequest).actionGet();
+        }
+    }
+
     /**
     /**
      * Once the job state has been written calling this function makes it
      * Once the job state has been written calling this function makes it
      * immediately searchable.
      * immediately searchable.

+ 2 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

@@ -345,6 +345,7 @@ public class AutodetectResultProcessor {
                 bulkResultsPersister.executeRequest();
                 bulkResultsPersister.executeRequest();
                 bulkAnnotationsPersister.executeRequest();
                 bulkAnnotationsPersister.executeRequest();
                 persister.commitResultWrites(jobId);
                 persister.commitResultWrites(jobId);
+                persister.commitAnnotationWrites();
                 LOGGER.debug("[{}] Flush acknowledgement sent to listener for ID {}", jobId, flushAcknowledgement.getId());
                 LOGGER.debug("[{}] Flush acknowledgement sent to listener for ID {}", jobId, flushAcknowledgement.getId());
             } catch (Exception e) {
             } catch (Exception e) {
                 LOGGER.error(
                 LOGGER.error(
@@ -469,6 +470,7 @@ public class AutodetectResultProcessor {
             // These lines ensure that the "completion" we're awaiting includes making the results searchable
             // These lines ensure that the "completion" we're awaiting includes making the results searchable
             waitUntilRenormalizerIsIdle();
             waitUntilRenormalizerIsIdle();
             persister.commitResultWrites(jobId);
             persister.commitResultWrites(jobId);
+            persister.commitAnnotationWrites();
             persister.commitStateWrites(jobId);
             persister.commitStateWrites(jobId);
 
 
         } catch (InterruptedException e) {
         } catch (InterruptedException e) {

+ 9 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java

@@ -56,6 +56,7 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
 import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.List;
@@ -138,7 +139,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
 
     public void testProcess() throws TimeoutException {
     public void testProcess() throws TimeoutException {
         AutodetectResult autodetectResult = mock(AutodetectResult.class);
         AutodetectResult autodetectResult = mock(AutodetectResult.class);
-        when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator());
+        when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator());
 
 
         processorUnderTest.process();
         processorUnderTest.process();
         processorUnderTest.awaitCompletion();
         processorUnderTest.awaitCompletion();
@@ -147,6 +148,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(renormalizer).waitUntilIdle();
         verify(renormalizer).waitUntilIdle();
         verify(persister).bulkPersisterBuilder(eq(JOB_ID));
         verify(persister).bulkPersisterBuilder(eq(JOB_ID));
         verify(persister).commitResultWrites(JOB_ID);
         verify(persister).commitResultWrites(JOB_ID);
+        verify(persister).commitAnnotationWrites();
         verify(persister).commitStateWrites(JOB_ID);
         verify(persister).commitStateWrites(JOB_ID);
     }
     }
 
 
@@ -243,6 +245,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(persister).bulkPersisterBuilder(eq(JOB_ID));
         verify(persister).bulkPersisterBuilder(eq(JOB_ID));
         verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
         verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
         verify(persister).commitResultWrites(JOB_ID);
         verify(persister).commitResultWrites(JOB_ID);
+        verify(persister).commitAnnotationWrites();
         verify(bulkResultsPersister).executeRequest();
         verify(bulkResultsPersister).executeRequest();
     }
     }
 
 
@@ -264,6 +267,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         inOrder.verify(persister).persistCategoryDefinition(eq(categoryDefinition), any());
         inOrder.verify(persister).persistCategoryDefinition(eq(categoryDefinition), any());
         inOrder.verify(bulkResultsPersister).executeRequest();
         inOrder.verify(bulkResultsPersister).executeRequest();
         inOrder.verify(persister).commitResultWrites(JOB_ID);
         inOrder.verify(persister).commitResultWrites(JOB_ID);
+        inOrder.verify(persister).commitAnnotationWrites();
         inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
         inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
     }
     }
 
 
@@ -453,7 +457,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
 
     public void testAwaitCompletion() throws TimeoutException {
     public void testAwaitCompletion() throws TimeoutException {
         AutodetectResult autodetectResult = mock(AutodetectResult.class);
         AutodetectResult autodetectResult = mock(AutodetectResult.class);
-        when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator());
+        when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator());
 
 
         processorUnderTest.process();
         processorUnderTest.process();
         processorUnderTest.awaitCompletion();
         processorUnderTest.awaitCompletion();
@@ -462,6 +466,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
 
         verify(persister).bulkPersisterBuilder(eq(JOB_ID));
         verify(persister).bulkPersisterBuilder(eq(JOB_ID));
         verify(persister).commitResultWrites(JOB_ID);
         verify(persister).commitResultWrites(JOB_ID);
+        verify(persister).commitAnnotationWrites();
         verify(persister).commitStateWrites(JOB_ID);
         verify(persister).commitStateWrites(JOB_ID);
         verify(renormalizer).waitUntilIdle();
         verify(renormalizer).waitUntilIdle();
     }
     }
@@ -503,7 +508,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
 
     public void testKill() throws TimeoutException {
     public void testKill() throws TimeoutException {
         AutodetectResult autodetectResult = mock(AutodetectResult.class);
         AutodetectResult autodetectResult = mock(AutodetectResult.class);
-        when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator());
+        when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator());
 
 
         processorUnderTest.setProcessKilled();
         processorUnderTest.setProcessKilled();
         processorUnderTest.process();
         processorUnderTest.process();
@@ -513,6 +518,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
 
         verify(persister).bulkPersisterBuilder(eq(JOB_ID));
         verify(persister).bulkPersisterBuilder(eq(JOB_ID));
         verify(persister).commitResultWrites(JOB_ID);
         verify(persister).commitResultWrites(JOB_ID);
+        verify(persister).commitAnnotationWrites();
         verify(persister).commitStateWrites(JOB_ID);
         verify(persister).commitStateWrites(JOB_ID);
         verify(renormalizer, never()).renormalize(any());
         verify(renormalizer, never()).renormalize(any());
         verify(renormalizer).shutdown();
         verify(renormalizer).shutdown();