Browse Source

[ML] Add internal flag to flush api to control whether or not to flush indices (#96803)

When datafeeds send flush requests they don't require the indices to be refreshed (and in fact may be detrimentally expensive in the case of small bucket sizes).

This change adds an (internal) flag to control whether or not a refresh is required when flushing.
Ed Savage 2 years ago
parent
commit
cbe8aa9de6
18 changed files with 176 additions and 25 deletions
  1. 2 1
      server/src/main/java/org/elasticsearch/TransportVersion.java
  2. 25 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java
  3. 26 5
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java
  4. 6 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java
  5. 1 1
      x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java
  6. 1 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java
  7. 3 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java
  8. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java
  9. 6 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java
  10. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java
  11. 23 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java
  12. 6 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java
  13. 4 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java
  14. 22 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java
  15. 10 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java
  16. 2 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java
  17. 36 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java
  18. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java

+ 2 - 1
server/src/main/java/org/elasticsearch/TransportVersion.java

@@ -135,9 +135,10 @@ public record TransportVersion(int id) implements Comparable<TransportVersion> {
     public static final TransportVersion V_8_500_009 = registerTransportVersion(8_500_009, "35091358-fd41-4106-a6e2-d2a1315494c1");
     public static final TransportVersion V_8_500_010 = registerTransportVersion(8_500_010, "9818C628-1EEC-439B-B943-468F61460675");
     public static final TransportVersion V_8_500_011 = registerTransportVersion(8_500_011, "2209F28D-B52E-4BC4-9889-E780F291C32E");
+    public static final TransportVersion V_8_500_012 = registerTransportVersion(8_500_012, "BB6F4AF1-A860-4FD4-A138-8150FFBE0ABD");
 
     private static class CurrentHolder {
-        private static final TransportVersion CURRENT = findCurrent(V_8_500_011);
+        private static final TransportVersion CURRENT = findCurrent(V_8_500_012);
 
         // finds the pluggable current version, or uses the given fallback
         private static TransportVersion findCurrent(TransportVersion fallback) {

+ 25 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.core.ml.action;
 
+import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.support.tasks.BaseTasksResponse;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -62,6 +63,7 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
 
         private boolean calcInterim = false;
         private boolean waitForNormalization = true;
+        private boolean refreshRequired = true;
         private String start;
         private String end;
         private String advanceTime;
@@ -77,6 +79,9 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
             advanceTime = in.readOptionalString();
             skipTime = in.readOptionalString();
             waitForNormalization = in.readBoolean();
+            if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) {
+                refreshRequired = in.readBoolean();
+            }
         }
 
         @Override
@@ -88,6 +93,9 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
             out.writeOptionalString(advanceTime);
             out.writeOptionalString(skipTime);
             out.writeBoolean(waitForNormalization);
+            if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) {
+                out.writeBoolean(refreshRequired);
+            }
         }
 
         public Request(String jobId) {
@@ -138,8 +146,12 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
             return waitForNormalization;
         }
 
+        public boolean isRefreshRequired() {
+            return refreshRequired;
+        }
+
         /**
-         * Used internally. Datafeeds do not need to wait renormalization to complete before continuing.
+         * Used internally. Datafeeds do not need to wait for renormalization to complete before continuing.
          *
          * For large jobs, renormalization can take minutes, causing datafeeds to needlessly pause execution.
          */
@@ -147,9 +159,19 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
             this.waitForNormalization = waitForNormalization;
         }
 
+        /**
+         * Used internally. For datafeeds, there is no need for the results to be searchable after the flush,
+         * as the datafeed itself does not search them immediately.
+         *
+         * Particularly for short bucket spans these refreshes could be a significant cost.
+         **/
+        public void setRefreshRequired(boolean refreshRequired) {
+            this.refreshRequired = refreshRequired;
+        }
+
         @Override
         public int hashCode() {
-            return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime, waitForNormalization);
+            return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime, waitForNormalization, refreshRequired);
         }
 
         @Override
@@ -164,6 +186,7 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
             return Objects.equals(jobId, other.jobId)
                 && calcInterim == other.calcInterim
                 && waitForNormalization == other.waitForNormalization
+                && refreshRequired == other.refreshRequired
                 && Objects.equals(start, other.start)
                 && Objects.equals(end, other.end)
                 && Objects.equals(advanceTime, other.advanceTime)

+ 26 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.core.ml.job.process.autodetect.output;
 
+import org.elasticsearch.TransportVersion;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -28,44 +29,57 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
     public static final ParseField TYPE = new ParseField("flush");
     public static final ParseField ID = new ParseField("id");
     public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end");
+    public static final ParseField REFRESH_REQUIRED = new ParseField("refresh_required");
 
     public static final ConstructingObjectParser<FlushAcknowledgement, Void> PARSER = new ConstructingObjectParser<>(
         TYPE.getPreferredName(),
-        a -> new FlushAcknowledgement((String) a[0], (Long) a[1])
+        a -> new FlushAcknowledgement((String) a[0], (Long) a[1], (Boolean) a[2])
     );
 
     static {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
         PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_FINALIZED_BUCKET_END);
+        PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REFRESH_REQUIRED);
     }
 
     private final String id;
     private final Instant lastFinalizedBucketEnd;
+    private final boolean refreshRequired;
 
-    public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs) {
+    public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs, Boolean refreshRequired) {
         this.id = id;
         // The C++ passes 0 when last finalized bucket end is not available, so treat 0 as null
         this.lastFinalizedBucketEnd = (lastFinalizedBucketEndMs != null && lastFinalizedBucketEndMs > 0)
             ? Instant.ofEpochMilli(lastFinalizedBucketEndMs)
             : null;
+        this.refreshRequired = refreshRequired == null || refreshRequired;
     }
 
-    public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd) {
+    public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd, Boolean refreshRequired) {
         this.id = id;
         // Round to millisecond accuracy to ensure round-tripping via XContent results in an equal object
         long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0;
         this.lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null;
+        this.refreshRequired = refreshRequired == null || refreshRequired;
     }
 
     public FlushAcknowledgement(StreamInput in) throws IOException {
         id = in.readString();
         lastFinalizedBucketEnd = in.readOptionalInstant();
+        if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) {
+            refreshRequired = in.readBoolean();
+        } else {
+            refreshRequired = true;
+        }
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(id);
         out.writeOptionalInstant(lastFinalizedBucketEnd);
+        if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) {
+            out.writeBoolean(refreshRequired);
+        }
     }
 
     public String getId() {
@@ -76,6 +90,10 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
         return lastFinalizedBucketEnd;
     }
 
+    public boolean getRefreshRequired() {
+        return refreshRequired;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
@@ -87,13 +105,14 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
                 lastFinalizedBucketEnd.toEpochMilli()
             );
         }
+        builder.field(REFRESH_REQUIRED.getPreferredName(), refreshRequired);
         builder.endObject();
         return builder;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, lastFinalizedBucketEnd);
+        return Objects.hash(id, lastFinalizedBucketEnd, refreshRequired);
     }
 
     @Override
@@ -105,6 +124,8 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
             return false;
         }
         FlushAcknowledgement other = (FlushAcknowledgement) obj;
-        return Objects.equals(id, other.id) && Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd);
+        return Objects.equals(id, other.id)
+            && Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd)
+            && refreshRequired == other.refreshRequired;
     }
 }

+ 6 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java

@@ -19,6 +19,9 @@ public class FlushJobActionRequestTests extends AbstractBWCWireSerializationTest
         if (randomBoolean()) {
             request.setWaitForNormalization(randomBoolean());
         }
+        if (randomBoolean()) {
+            request.setRefreshRequired(randomBoolean());
+        }
         if (randomBoolean()) {
             request.setCalcInterim(randomBoolean());
         }
@@ -49,6 +52,9 @@ public class FlushJobActionRequestTests extends AbstractBWCWireSerializationTest
 
     @Override
     protected Request mutateInstanceForVersion(Request instance, TransportVersion version) {
+        if (version.before(TransportVersion.V_8_500_012)) {
+            instance.setRefreshRequired(true);
+        }
         return instance;
     }
 }

+ 1 - 1
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java

@@ -575,7 +575,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
     }
 
     private static FlushAcknowledgement createFlushAcknowledgement() {
-        return new FlushAcknowledgement(randomAlphaOfLength(5), randomInstant());
+        return new FlushAcknowledgement(randomAlphaOfLength(5), randomInstant(), true);
     }
 
     private static class ResultsBuilder {

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java

@@ -51,6 +51,7 @@ public class TransportFlushJobAction extends TransportJobTaskAction<FlushJobActi
         FlushJobParams.Builder paramsBuilder = FlushJobParams.builder();
         paramsBuilder.calcInterim(request.getCalcInterim());
         paramsBuilder.waitForNormalization(request.isWaitForNormalization());
+        paramsBuilder.refreshRequired(request.isRefreshRequired());
         if (request.getAdvanceTime() != null) {
             paramsBuilder.advanceTime(request.getAdvanceTime());
         }

+ 3 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

@@ -175,6 +175,7 @@ class DatafeedJob {
 
         FlushJobAction.Request request = new FlushJobAction.Request(jobId);
         request.setCalcInterim(true);
+        request.setRefreshRequired(false);
         run(lookbackStartTimeMs, lookbackEnd, request);
         if (shouldPersistAfterLookback(isLookbackOnly)) {
             sendPersistRequest();
@@ -205,6 +206,7 @@ class DatafeedJob {
             // start time is after last checkpoint, thus we need to skip time
             FlushJobAction.Request request = new FlushJobAction.Request(jobId);
             request.setSkipTime(String.valueOf(startTime));
+            request.setRefreshRequired(false);
             FlushJobAction.Response flushResponse = flushJob(request);
             LOGGER.info("[{}] Skipped to time [{}]", jobId, flushResponse.getLastFinalizedBucketEnd().toEpochMilli());
             return flushResponse.getLastFinalizedBucketEnd().toEpochMilli();
@@ -218,6 +220,7 @@ class DatafeedJob {
         long end = toIntervalStartEpochMs(nowMinusQueryDelay);
         FlushJobAction.Request request = new FlushJobAction.Request(jobId);
         request.setWaitForNormalization(false);
+        request.setRefreshRequired(false);
         request.setCalcInterim(true);
         request.setAdvanceTime(String.valueOf(end));
         run(start, end, request);

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java

@@ -101,7 +101,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
      */
     @Override
     public String flushJob(FlushJobParams params) {
-        FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, 0L);
+        FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, 0L, true);
         AutodetectResult result = new AutodetectResult(
             null,
             null,

+ 6 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

@@ -394,10 +394,12 @@ public class AutodetectResultProcessor {
             try {
                 bulkResultsPersister.executeRequest();
                 bulkAnnotationsPersister.executeRequest();
-                persister.commitWrites(
-                    jobId,
-                    EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS)
-                );
+                if (flushAcknowledgement.getRefreshRequired()) {
+                    persister.commitWrites(
+                        jobId,
+                        EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS)
+                    );
+                }
             } catch (Exception e) {
                 logger.error(
                     "["

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java

@@ -70,7 +70,7 @@ class FlushListener {
         private volatile Exception flushException;
 
         private FlushAcknowledgementHolder(String flushId) {
-            this.flushAcknowledgement = new FlushAcknowledgement(flushId, 0L);
+            this.flushAcknowledgement = new FlushAcknowledgement(flushId, 0L, true);
             this.latch = new CountDownLatch(1);
         }
     }

+ 23 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java

@@ -41,18 +41,25 @@ public class FlushJobParams {
      */
     private final boolean waitForNormalization;
 
+    /**
+     * Should the flush request trigger a refresh or not.
+     */
+    private final boolean refreshRequired;
+
     private FlushJobParams(
         boolean calcInterim,
         TimeRange timeRange,
         Long advanceTimeSeconds,
         Long skipTimeSeconds,
-        boolean waitForNormalization
+        boolean waitForNormalization,
+        boolean refreshRequired
     ) {
         this.calcInterim = calcInterim;
         this.timeRange = Objects.requireNonNull(timeRange);
         this.advanceTimeSeconds = advanceTimeSeconds;
         this.skipTimeSeconds = skipTimeSeconds;
         this.waitForNormalization = waitForNormalization;
+        this.refreshRequired = refreshRequired;
     }
 
     public boolean shouldCalculateInterim() {
@@ -93,6 +100,10 @@ public class FlushJobParams {
         return waitForNormalization;
     }
 
+    public boolean isRefreshRequired() {
+        return refreshRequired;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -105,12 +116,14 @@ public class FlushJobParams {
         return calcInterim == that.calcInterim
             && Objects.equals(timeRange, that.timeRange)
             && Objects.equals(advanceTimeSeconds, that.advanceTimeSeconds)
-            && Objects.equals(skipTimeSeconds, that.skipTimeSeconds);
+            && Objects.equals(skipTimeSeconds, that.skipTimeSeconds)
+            && waitForNormalization == that.waitForNormalization
+            && refreshRequired == that.refreshRequired;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds);
+        return Objects.hash(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization, refreshRequired);
     }
 
     public static class Builder {
@@ -119,6 +132,7 @@ public class FlushJobParams {
         private String advanceTime;
         private String skipTime;
         private boolean waitForNormalization = true;
+        private boolean refreshRequired = true;
 
         public Builder calcInterim(boolean value) {
             calcInterim = value;
@@ -145,6 +159,11 @@ public class FlushJobParams {
             return this;
         }
 
+        public Builder refreshRequired(boolean refreshRequired) {
+            this.refreshRequired = refreshRequired;
+            return this;
+        }
+
         public FlushJobParams build() {
             checkValidFlushArgumentsCombination();
             Long advanceTimeSeconds = parseTimeParam("advance_time", advanceTime);
@@ -154,7 +173,7 @@ public class FlushJobParams {
                     "advance_time [" + advanceTime + "] must be later than skip_time [" + skipTime + "]"
                 );
             }
-            return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization);
+            return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization, refreshRequired);
         }
 
         private void checkValidFlushArgumentsCombination() {

+ 6 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java

@@ -78,6 +78,11 @@ public class AutodetectControlMsgWriter extends AbstractControlMsgWriter {
      */
     public static final String BACKGROUND_PERSIST_MESSAGE_CODE = "w";
 
+    /**
+     * This must match the code defined in the api::CAnomalyJob C++ class.
+     */
+    public static final String REFRESH_REQUIRED_MESSAGE_CODE = "z";
+
     /**
      * An number to uniquely identify each flush so that subsequent code can
      * wait for acknowledgement of the correct flush.
@@ -143,6 +148,7 @@ public class AutodetectControlMsgWriter extends AbstractControlMsgWriter {
         if (params.shouldCalculateInterim()) {
             writeControlCodeFollowedByTimeRange(INTERIM_MESSAGE_CODE, params.getStart(), params.getEnd());
         }
+        writeMessage(REFRESH_REQUIRED_MESSAGE_CODE + params.isRefreshRequired());
     }
 
     /**

+ 4 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java

@@ -176,6 +176,7 @@ public class DatafeedJobTests extends ESTestCase {
         verify(dataExtractorFactory).newExtractor(0L, 1000L);
         FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId);
         flushRequest.setCalcInterim(true);
+        flushRequest.setRefreshRequired(false);
         verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
         verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
     }
@@ -202,6 +203,7 @@ public class DatafeedJobTests extends ESTestCase {
         verify(dataExtractorFactory).newExtractor(0L, 1500L);
         FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId);
         flushRequest.setCalcInterim(true);
+        flushRequest.setRefreshRequired(false);
         verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
         verify(client).execute(same(PersistJobAction.INSTANCE), eq(new PersistJobAction.Request(jobId)));
     }
@@ -226,6 +228,7 @@ public class DatafeedJobTests extends ESTestCase {
         assertThat(flushJobRequests.getAllValues().size(), equalTo(1));
         FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId);
         flushRequest.setCalcInterim(true);
+        flushRequest.setRefreshRequired(false);
         verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
         verify(client).execute(same(PersistJobAction.INSTANCE), eq(new PersistJobAction.Request(jobId)));
     }
@@ -285,6 +288,7 @@ public class DatafeedJobTests extends ESTestCase {
         flushRequest.setCalcInterim(true);
         flushRequest.setAdvanceTime("59000");
         flushRequest.setWaitForNormalization(false);
+        flushRequest.setRefreshRequired(false);
         verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
         verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
 

+ 22 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java

@@ -248,10 +248,11 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(bulkResultsPersister, never()).executeRequest();
     }
 
-    public void testProcessResult_flushAcknowledgement() {
+    public void testProcessResult_flushAcknowledgementWithRefresh() {
         AutodetectResult result = mock(AutodetectResult.class);
         FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
         when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
+        when(flushAcknowledgement.getRefreshRequired()).thenReturn(true);
         when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
 
         processorUnderTest.setDeleteInterimRequired(false);
@@ -267,10 +268,30 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(bulkResultsPersister).executeRequest();
     }
 
+    public void testProcessResult_flushAcknowledgement() {
+        AutodetectResult result = mock(AutodetectResult.class);
+        FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
+        when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
+        when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
+
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+        assertTrue(processorUnderTest.isDeleteInterimRequired());
+
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
+        verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
+        verify(persister, never()).commitWrites(
+            JOB_ID,
+            EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS)
+        );
+        verify(bulkResultsPersister).executeRequest();
+    }
+
     public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
         AutodetectResult result = mock(AutodetectResult.class);
         FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
         when(flushAcknowledgement.getId()).thenReturn(Integer.valueOf(randomInt(100)).toString());
+        when(flushAcknowledgement.getRefreshRequired()).thenReturn(true);
         when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
         CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
         when(categoryDefinition.getCategoryId()).thenReturn(1L);

+ 10 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java

@@ -23,9 +23,17 @@ public class FlushAcknowledgementTests extends AbstractXContentSerializingTestCa
     @Override
     protected FlushAcknowledgement createTestInstance() {
         if (randomBoolean()) {
-            return new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), randomFrom(randomNonNegativeLong(), 0L, null));
+            return new FlushAcknowledgement(
+                randomAlphaOfLengthBetween(1, 20),
+                randomFrom(randomNonNegativeLong(), 0L, null),
+                randomBoolean()
+            );
         } else {
-            return new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), randomFrom(randomInstant(), Instant.EPOCH, null));
+            return new FlushAcknowledgement(
+                randomAlphaOfLengthBetween(1, 20),
+                randomFrom(randomInstant(), Instant.EPOCH, null),
+                randomBoolean()
+            );
         }
     }
 

+ 2 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java

@@ -35,7 +35,7 @@ public class FlushListenerTests extends ESTestCase {
         }).start();
         assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
         assertNull(flushAcknowledgementHolder.get());
-        FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", 12345678L);
+        FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", 12345678L, false);
         listener.acknowledgeFlush(flushAcknowledgement, null);
         assertBusy(() -> assertNotNull(flushAcknowledgementHolder.get()));
         assertEquals(1, listener.awaitingFlushed.size());
@@ -59,7 +59,7 @@ public class FlushListenerTests extends ESTestCase {
         }).start();
         assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
         assertNull(flushExceptionHolder.get());
-        FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", Instant.ofEpochMilli(12345678L));
+        FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", Instant.ofEpochMilli(12345678L), true);
         listener.acknowledgeFlush(flushAcknowledgement, new Exception("BOOM"));
         assertBusy(() -> {
             assertNotNull(flushExceptionHolder.get());

+ 36 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java

@@ -58,6 +58,9 @@ public class AutodetectControlMsgWriterTests extends ESTestCase {
         inOrder.verify(lengthEncodedWriter).writeNumFields(4);
         inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
         inOrder.verify(lengthEncodedWriter).writeField("t1234567890");
+        inOrder.verify(lengthEncodedWriter).writeNumFields(4);
+        inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
+        inOrder.verify(lengthEncodedWriter).writeField("ztrue");
         verifyNoMoreInteractions(lengthEncodedWriter);
     }
 
@@ -71,6 +74,9 @@ public class AutodetectControlMsgWriterTests extends ESTestCase {
         inOrder.verify(lengthEncodedWriter).writeNumFields(4);
         inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
         inOrder.verify(lengthEncodedWriter).writeField("s1234567890");
+        inOrder.verify(lengthEncodedWriter).writeNumFields(4);
+        inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
+        inOrder.verify(lengthEncodedWriter).writeField("ztrue");
         verifyNoMoreInteractions(lengthEncodedWriter);
     }
 
@@ -95,6 +101,9 @@ public class AutodetectControlMsgWriterTests extends ESTestCase {
         inOrder.verify(lengthEncodedWriter).writeNumFields(4);
         inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
         inOrder.verify(lengthEncodedWriter).writeField("i");
+        inOrder.verify(lengthEncodedWriter).writeNumFields(4);
+        inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
+        inOrder.verify(lengthEncodedWriter).writeField("ztrue");
         verifyNoMoreInteractions(lengthEncodedWriter);
     }
 
@@ -104,6 +113,27 @@ public class AutodetectControlMsgWriterTests extends ESTestCase {
 
         writer.writeFlushControlMessage(flushJobParams);
 
+        // Even a plain flush message contains the "refreshRequired" flag, which
+        // is set to "true" by default
+        InOrder inOrder = inOrder(lengthEncodedWriter);
+        inOrder.verify(lengthEncodedWriter).writeNumFields(4);
+        inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
+        inOrder.verify(lengthEncodedWriter).writeField("ztrue");
+        verifyNoMoreInteractions(lengthEncodedWriter);
+    }
+
+    public void testWriteFlushControlMessage_GivenShouldRefreshFalse() throws IOException {
+        AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 4);
+        FlushJobParams flushJobParams = FlushJobParams.builder().refreshRequired(false).build();
+
+        writer.writeFlushControlMessage(flushJobParams);
+
+        // Even a plain flush message contains the "refreshRequired" flag, which
+        // is set to "true" by default
+        InOrder inOrder = inOrder(lengthEncodedWriter);
+        inOrder.verify(lengthEncodedWriter).writeNumFields(4);
+        inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
+        inOrder.verify(lengthEncodedWriter).writeField("zfalse");
         verifyNoMoreInteractions(lengthEncodedWriter);
     }
 
@@ -120,6 +150,9 @@ public class AutodetectControlMsgWriterTests extends ESTestCase {
         inOrder.verify(lengthEncodedWriter).writeNumFields(4);
         inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
         inOrder.verify(lengthEncodedWriter).writeField("i120 180");
+        inOrder.verify(lengthEncodedWriter).writeNumFields(4);
+        inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
+        inOrder.verify(lengthEncodedWriter).writeField("ztrue");
         verifyNoMoreInteractions(lengthEncodedWriter);
     }
 
@@ -140,6 +173,9 @@ public class AutodetectControlMsgWriterTests extends ESTestCase {
         inOrder.verify(lengthEncodedWriter).writeNumFields(4);
         inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
         inOrder.verify(lengthEncodedWriter).writeField("i50 100");
+        inOrder.verify(lengthEncodedWriter).writeNumFields(4);
+        inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
+        inOrder.verify(lengthEncodedWriter).writeField("ztrue");
         verifyNoMoreInteractions(lengthEncodedWriter);
     }
 

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java

@@ -129,7 +129,7 @@ public class AutodetectResultTests extends AbstractXContentSerializingTestCase<A
             categorizerStats = null;
         }
         if (randomBoolean()) {
-            flushAcknowledgement = new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), randomInstant());
+            flushAcknowledgement = new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), randomInstant(), randomBoolean());
         } else {
             flushAcknowledgement = null;
         }