Browse Source

[ML] Add latest search interval to datafeed stats (#82620)

This commit adds `search_interval` to the datafeed stats API
`running_state` object. When the datafeed is running, it reports
the last search interval that was searched. It is useful to
understand the point in time where the datafeed is currently
searching.

Closes #82405
Dimitris Athanasiou 3 years ago
parent
commit
93777b4e99
21 changed files with 304 additions and 122 deletions
  1. 13 0
      docs/reference/ml/anomaly-detection/apis/get-datafeed-stats.asciidoc
  2. 24 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java
  3. 44 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/SearchInterval.java
  4. 6 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/DataExtractor.java
  5. 2 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateActionResponseTests.java
  6. 29 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/SearchIntervalTests.java
  7. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedRunningStateAction.java
  8. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java
  9. 6 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java
  10. 11 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java
  11. 6 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedRunner.java
  12. 10 6
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java
  13. 5 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractor.java
  14. 9 7
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java
  15. 3 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java
  16. 3 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedActionTests.java
  17. 6 5
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java
  18. 9 5
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java
  19. 10 6
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorTests.java
  20. 83 60
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java
  21. 23 15
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java

+ 13 - 0
docs/reference/ml/anomaly-detection/apis/get-datafeed-stats.asciidoc

@@ -112,6 +112,19 @@ has no configured `end` time.
 (boolean) Indicates whether the {dfeed} has finished running on the available
 past data. For {dfeeds} without a configured `end` time, this means that
 the {dfeed} is now running on "real-time" data.
+
+`search_interval`:::
+(Optional, object) Provides the latest time interval the {dfeed} has searched.
++
+[%collapsible%open]
+=====
+`start_ms`::::
+The start time as an epoch in milliseconds.
+
+`end_ms`::::
+ The end time as an epoch in milliseconds.
+=====
+
 ====
 --
 

+ 24 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.core.ml.action;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.support.tasks.BaseTasksRequest;
 import org.elasticsearch.action.support.tasks.BaseTasksResponse;
@@ -13,10 +14,12 @@ import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.ml.MlTasks;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
 
 import java.io.IOException;
 import java.util.List;
@@ -76,14 +79,24 @@ public class GetDatafeedRunningStateAction extends ActionType<GetDatafeedRunning
             // Has the look back finished and are we now running on "real-time" data
             private final boolean realTimeRunning;
 
-            public RunningState(boolean realTimeConfigured, boolean realTimeRunning) {
+            // The current time interval that datafeed is searching
+            @Nullable
+            private final SearchInterval searchInterval;
+
+            public RunningState(boolean realTimeConfigured, boolean realTimeRunning, @Nullable SearchInterval searchInterval) {
                 this.realTimeConfigured = realTimeConfigured;
                 this.realTimeRunning = realTimeRunning;
+                this.searchInterval = searchInterval;
             }
 
             public RunningState(StreamInput in) throws IOException {
                 this.realTimeConfigured = in.readBoolean();
                 this.realTimeRunning = in.readBoolean();
+                if (in.getVersion().onOrAfter(Version.V_8_1_0)) {
+                    this.searchInterval = in.readOptionalWriteable(SearchInterval::new);
+                } else {
+                    this.searchInterval = null;
+                }
             }
 
             @Override
@@ -91,18 +104,23 @@ public class GetDatafeedRunningStateAction extends ActionType<GetDatafeedRunning
                 if (this == o) return true;
                 if (o == null || getClass() != o.getClass()) return false;
                 RunningState that = (RunningState) o;
-                return realTimeConfigured == that.realTimeConfigured && realTimeRunning == that.realTimeRunning;
+                return realTimeConfigured == that.realTimeConfigured
+                    && realTimeRunning == that.realTimeRunning
+                    && Objects.equals(searchInterval, that.searchInterval);
             }
 
             @Override
             public int hashCode() {
-                return Objects.hash(realTimeConfigured, realTimeRunning);
+                return Objects.hash(realTimeConfigured, realTimeRunning, searchInterval);
             }
 
             @Override
             public void writeTo(StreamOutput out) throws IOException {
                 out.writeBoolean(realTimeConfigured);
                 out.writeBoolean(realTimeRunning);
+                if (out.getVersion().onOrAfter(Version.V_8_1_0)) {
+                    out.writeOptionalWriteable(searchInterval);
+                }
             }
 
             @Override
@@ -110,6 +128,9 @@ public class GetDatafeedRunningStateAction extends ActionType<GetDatafeedRunning
                 builder.startObject();
                 builder.field("real_time_configured", realTimeConfigured);
                 builder.field("real_time_running", realTimeRunning);
+                if (searchInterval != null) {
+                    builder.field("search_interval", searchInterval);
+                }
                 builder.endObject();
                 return builder;
             }

+ 44 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/SearchInterval.java

@@ -0,0 +1,44 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ml.datafeed;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.ToXContentObject;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+
+public record SearchInterval(long startMs, long endMs) implements ToXContentObject, Writeable {
+
+    public static final ParseField START = new ParseField("start");
+    public static final ParseField START_MS = new ParseField("start_ms");
+    public static final ParseField END = new ParseField("end");
+    public static final ParseField END_MS = new ParseField("end_ms");
+
+    public SearchInterval(StreamInput in) throws IOException {
+        this(in.readVLong(), in.readVLong());
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.timeField(START_MS.getPreferredName(), START.getPreferredName(), startMs);
+        builder.timeField(END_MS.getPreferredName(), END.getPreferredName(), endMs);
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeVLong(startMs);
+        out.writeVLong(endMs);
+    }
+}

+ 6 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/DataExtractor.java

@@ -6,12 +6,16 @@
  */
 package org.elasticsearch.xpack.core.ml.datafeed.extractor;
 
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Optional;
 
 public interface DataExtractor {
 
+    record Result(SearchInterval searchInterval, Optional<InputStream> data) {}
+
     /**
      * @return {@code true} if the search has not finished yet, or {@code false} otherwise
      */
@@ -20,10 +24,10 @@ public interface DataExtractor {
     /**
      * Returns the next available extracted data. Note that it is possible for the
      * extracted data to be empty the last time this method can be called.
-     * @return an optional input stream with the next available extracted data
+     * @return a result with the search interval and an optional input stream with the next available extracted data
      * @throws IOException if an error occurs while extracting the data
      */
-    Optional<InputStream> next() throws IOException;
+    Result next() throws IOException;
 
     /**
      * @return {@code true} if the extractor has been cancelled, or {@code false} otherwise

+ 2 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateActionResponseTests.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.core.ml.action;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchIntervalTests;
 
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -17,7 +18,7 @@ import java.util.stream.Stream;
 public class GetDatafeedRunningStateActionResponseTests extends AbstractWireSerializingTestCase<Response> {
 
     static Response.RunningState randomRunningState() {
-        return new Response.RunningState(randomBoolean(), randomBoolean());
+        return new Response.RunningState(randomBoolean(), randomBoolean(), randomBoolean() ? null : SearchIntervalTests.createRandom());
     }
 
     @Override

+ 29 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/SearchIntervalTests.java

@@ -0,0 +1,29 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ml.datafeed;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+
+public class SearchIntervalTests extends AbstractWireSerializingTestCase<SearchInterval> {
+
+    @Override
+    protected Writeable.Reader<SearchInterval> instanceReader() {
+        return SearchInterval::new;
+    }
+
+    @Override
+    protected SearchInterval createTestInstance() {
+        return createRandom();
+    }
+
+    public static SearchInterval createRandom() {
+        long start = randomNonNegativeLong();
+        return new SearchInterval(start, randomLongBetween(start, Long.MAX_VALUE));
+    }
+}

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedRunningStateAction.java

@@ -120,7 +120,7 @@ public class TransportGetDatafeedRunningStateAction extends TransportTasksAction
                             Collectors.toMap(
                                 StartDatafeedAction.DatafeedParams::getDatafeedId,
                                 // If it isn't assigned to a node, assume that look back hasn't completed yet
-                                params -> new Response.RunningState(params.getEndTime() == null, false)
+                                params -> new Response.RunningState(params.getEndTime() == null, false, null)
                             )
                         )
                 )

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java

@@ -181,7 +181,7 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
     /** Visible for testing */
     static void previewDatafeed(DataExtractor dataExtractor, ActionListener<PreviewDatafeedAction.Response> listener) {
         try {
-            Optional<InputStream> inputStream = dataExtractor.next();
+            Optional<InputStream> inputStream = dataExtractor.next().data();
             // DataExtractor returns single-line JSON but without newline characters between objects.
             // Instead, it has a space between objects due to how JSON XContentBuilder works.
             // In order to return a proper JSON array from preview, we surround with square brackets and

+ 6 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

@@ -690,10 +690,14 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
                     // reasonable to say real-time running hasn't started yet. The state will quickly
                     // change once the datafeed runner gets going and determines where the datafeed is up
                     // to.
-                    return new GetDatafeedRunningStateAction.Response.RunningState(endTime == null, false);
+                    return new GetDatafeedRunningStateAction.Response.RunningState(endTime == null, false, null);
                 }
             }
-            return new GetDatafeedRunningStateAction.Response.RunningState(endTime == null, datafeedRunner.finishedLookBack(this));
+            return new GetDatafeedRunningStateAction.Response.RunningState(
+                endTime == null,
+                datafeedRunner.finishedLookBack(this),
+                datafeedRunner.getSearchInterval(this)
+            );
         }
     }
 

+ 11 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

@@ -16,6 +16,7 @@ import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.mapper.DateFieldMapper;
@@ -25,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
 import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
 import org.elasticsearch.xpack.core.ml.action.PostDataAction;
 import org.elasticsearch.xpack.core.ml.annotations.Annotation;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
@@ -77,6 +79,7 @@ class DatafeedJob {
     private volatile boolean isIsolated;
     private volatile boolean haveEverSeenData;
     private volatile long consecutiveDelayedDataBuckets;
+    private volatile SearchInterval searchInterval;
 
     DatafeedJob(
         String jobId,
@@ -138,6 +141,11 @@ class DatafeedJob {
         timingStatsReporter.finishReporting();
     }
 
+    @Nullable
+    public SearchInterval getSearchInterval() {
+        return searchInterval;
+    }
+
     Long runLookBack(long startTime, Long endTime) throws Exception {
         lookbackStartTimeMs = skipToStartTime(startTime);
         Optional<Long> endMs = Optional.ofNullable(endTime);
@@ -358,7 +366,9 @@ class DatafeedJob {
 
             Optional<InputStream> extractedData;
             try {
-                extractedData = dataExtractor.next();
+                DataExtractor.Result result = dataExtractor.next();
+                extractedData = result.data();
+                searchInterval = result.searchInterval();
             } catch (Exception e) {
                 LOGGER.error(new ParameterizedMessage("[{}] error while extracting data", jobId), e);
                 // When extraction problems are encountered, we do not want to advance time.

+ 6 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedRunner.java

@@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
 import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
@@ -212,6 +213,11 @@ public class DatafeedRunner {
         return holder != null && holder.isLookbackFinished();
     }
 
+    public SearchInterval getSearchInterval(TransportStartDatafeedAction.DatafeedTask task) {
+        Holder holder = runningDatafeedsOnThisNode.get(task.getAllocationId());
+        return holder == null ? null : holder.datafeedJob.getSearchInterval();
+    }
+
     // Important: Holder must be created and assigned to DatafeedTask before setting state to started,
     // otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel
     // the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running.

+ 10 - 6
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java

@@ -17,6 +17,7 @@ import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
@@ -24,7 +25,6 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
@@ -84,16 +84,17 @@ abstract class AbstractAggregationDataExtractor<T extends ActionRequestBuilder<S
     }
 
     @Override
-    public Optional<InputStream> next() throws IOException {
+    public Result next() throws IOException {
         if (hasNext() == false) {
             throw new NoSuchElementException();
         }
 
+        SearchInterval searchInterval = new SearchInterval(context.start, context.end);
         if (aggregationToJsonProcessor == null) {
             Aggregations aggs = search();
             if (aggs == null) {
                 hasNext = false;
-                return Optional.empty();
+                return new Result(searchInterval, Optional.empty());
             }
             initAggregationProcessor(aggs);
         }
@@ -104,9 +105,12 @@ abstract class AbstractAggregationDataExtractor<T extends ActionRequestBuilder<S
         // We process the whole search. So, if we are chunking or not, we have nothing more to process given the current query
         hasNext = false;
 
-        return aggregationToJsonProcessor.getKeyValueCount() > 0
-            ? Optional.of(new ByteArrayInputStream(outputStream.toByteArray()))
-            : Optional.empty();
+        return new Result(
+            searchInterval,
+            aggregationToJsonProcessor.getKeyValueCount() > 0
+                ? Optional.of(new ByteArrayInputStream(outputStream.toByteArray()))
+                : Optional.empty()
+        );
     }
 
     private Aggregations search() {

+ 5 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractor.java

@@ -18,6 +18,7 @@ import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregati
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
 import org.elasticsearch.xpack.core.ml.utils.Intervals;
@@ -94,19 +95,20 @@ class CompositeAggregationDataExtractor implements DataExtractor {
     }
 
     @Override
-    public Optional<InputStream> next() throws IOException {
+    public Result next() throws IOException {
         if (hasNext() == false) {
             throw new NoSuchElementException();
         }
 
+        SearchInterval searchInterval = new SearchInterval(context.start, context.end);
         Aggregations aggs = search();
         if (aggs == null) {
             LOGGER.trace(() -> new ParameterizedMessage("[{}] extraction finished", context.jobId));
             hasNext = false;
             afterKey = null;
-            return Optional.empty();
+            return new Result(searchInterval, Optional.empty());
         }
-        return Optional.of(processAggs(aggs));
+        return new Result(searchInterval, Optional.of(processAggs(aggs)));
     }
 
     private Aggregations search() {

+ 9 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java

@@ -21,6 +21,7 @@ import org.elasticsearch.search.aggregations.metrics.Min;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
 import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
@@ -29,7 +30,6 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
 import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Optional;
@@ -106,7 +106,7 @@ public class ChunkedDataExtractor implements DataExtractor {
     }
 
     @Override
-    public Optional<InputStream> next() throws IOException {
+    public Result next() throws IOException {
         if (hasNext() == false) {
             throw new NoSuchElementException();
         }
@@ -144,7 +144,8 @@ public class ChunkedDataExtractor implements DataExtractor {
         return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get);
     }
 
-    private Optional<InputStream> getNextStream() throws IOException {
+    private Result getNextStream() throws IOException {
+        SearchInterval lastSearchInterval = new SearchInterval(context.start, context.end);
         while (hasNext()) {
             boolean isNewSearch = false;
 
@@ -154,9 +155,10 @@ public class ChunkedDataExtractor implements DataExtractor {
                 isNewSearch = true;
             }
 
-            Optional<InputStream> nextStream = currentExtractor.next();
-            if (nextStream.isPresent()) {
-                return nextStream;
+            Result result = currentExtractor.next();
+            lastSearchInterval = result.searchInterval();
+            if (result.data().isPresent()) {
+                return result;
             }
 
             if (isNewSearch && hasNext()) {
@@ -165,7 +167,7 @@ public class ChunkedDataExtractor implements DataExtractor {
                 setUpChunkedSearch();
             }
         }
-        return Optional.empty();
+        return new Result(lastSearchInterval, Optional.empty());
     }
 
     private void advanceTime() {

+ 3 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java

@@ -23,6 +23,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.fetch.StoredFieldsContext;
 import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
@@ -88,7 +89,7 @@ class ScrollDataExtractor implements DataExtractor {
     }
 
     @Override
-    public Optional<InputStream> next() throws IOException {
+    public Result next() throws IOException {
         if (hasNext() == false) {
             throw new NoSuchElementException();
         }
@@ -96,7 +97,7 @@ class ScrollDataExtractor implements DataExtractor {
         if (stream.isPresent() == false) {
             hasNext = false;
         }
-        return stream;
+        return new Result(new SearchInterval(context.start, context.end), stream);
     }
 
     private Optional<InputStream> tryNextStream() throws IOException {

+ 3 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedActionTests.java

@@ -15,6 +15,7 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
 import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchIntervalTests;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.junit.Before;
 import org.mockito.stubbing.Answer;
@@ -89,7 +90,7 @@ public class TransportPreviewDatafeedActionTests extends ESTestCase {
     }
 
     public void testPreviewDatafeed_GivenEmptyStream() throws IOException {
-        when(dataExtractor.next()).thenReturn(Optional.empty());
+        when(dataExtractor.next()).thenReturn(new DataExtractor.Result(SearchIntervalTests.createRandom(), Optional.empty()));
 
         TransportPreviewDatafeedAction.previewDatafeed(dataExtractor, actionListener);
 
@@ -101,7 +102,7 @@ public class TransportPreviewDatafeedActionTests extends ESTestCase {
     public void testPreviewDatafeed_GivenNonEmptyStream() throws IOException {
         String streamAsString = "{\"a\":1, \"b\":2} {\"c\":3, \"d\":4}\n{\"e\":5, \"f\":6}";
         InputStream stream = new ByteArrayInputStream(streamAsString.getBytes(StandardCharsets.UTF_8));
-        when(dataExtractor.next()).thenReturn(Optional.of(stream));
+        when(dataExtractor.next()).thenReturn(new DataExtractor.Result(SearchIntervalTests.createRandom(), Optional.of(stream)));
 
         TransportPreviewDatafeedAction.previewDatafeed(dataExtractor, actionListener);
 

+ 6 - 5
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java

@@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
 import org.elasticsearch.xpack.core.ml.action.PostDataAction;
 import org.elasticsearch.xpack.core.ml.annotations.Annotation;
 import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
@@ -133,7 +134,7 @@ public class DatafeedJobTests extends ESTestCase {
         when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
         byte[] contentBytes = "content".getBytes(StandardCharsets.UTF_8);
         InputStream inputStream = new ByteArrayInputStream(contentBytes);
-        when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
+        when(dataExtractor.next()).thenReturn(new DataExtractor.Result(new SearchInterval(1000L, 2000L), Optional.of(inputStream)));
         DataCounts dataCounts = new DataCounts(
             jobId,
             1,
@@ -292,7 +293,7 @@ public class DatafeedJobTests extends ESTestCase {
         byte[] contentBytes = "content".getBytes(StandardCharsets.UTF_8);
         InputStream inputStream = new ByteArrayInputStream(contentBytes);
         when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
-        when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
+        when(dataExtractor.next()).thenReturn(new DataExtractor.Result(new SearchInterval(1000L, 2000L), Optional.of(inputStream)));
         when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
         datafeedJob.runRealtime();
 
@@ -301,7 +302,7 @@ public class DatafeedJobTests extends ESTestCase {
         currentTime = currentTime + DELAYED_DATA_FREQ_HALF;
         inputStream = new ByteArrayInputStream(contentBytes);
         when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
-        when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
+        when(dataExtractor.next()).thenReturn(new DataExtractor.Result(new SearchInterval(1000L, 2000L), Optional.of(inputStream)));
         when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
         datafeedJob.runRealtime();
 
@@ -350,7 +351,7 @@ public class DatafeedJobTests extends ESTestCase {
         currentTime = currentTime + DELAYED_DATA_WINDOW + 1;
         inputStream = new ByteArrayInputStream(contentBytes);
         when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
-        when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
+        when(dataExtractor.next()).thenReturn(new DataExtractor.Result(new SearchInterval(1000L, 2000L), Optional.of(inputStream)));
         when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
         datafeedJob.runRealtime();
 
@@ -392,7 +393,7 @@ public class DatafeedJobTests extends ESTestCase {
         currentTime = currentTime + DELAYED_DATA_WINDOW + 1;
         inputStream = new ByteArrayInputStream(contentBytes);
         when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
-        when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
+        when(dataExtractor.next()).thenReturn(new DataExtractor.Result(new SearchInterval(1000L, 2000L), Optional.of(inputStream)));
         when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
         datafeedJob.runRealtime();
 

+ 9 - 5
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java

@@ -22,6 +22,8 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
+import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister;
 import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term;
@@ -143,7 +145,9 @@ public class AggregationDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response);
 
         assertThat(extractor.hasNext(), is(true));
-        Optional<InputStream> stream = extractor.next();
+        DataExtractor.Result result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(1000L, 4000L)));
+        Optional<InputStream> stream = result.data();
         assertThat(stream.isPresent(), is(true));
         String expectedStream = """
             {"time":1999,"airline":"a","responsetime":11.0,"doc_count":1} \
@@ -177,7 +181,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response);
 
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         assertThat(capturedSearchRequests.size(), equalTo(1));
@@ -190,7 +194,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response);
 
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         assertThat(capturedSearchRequests.size(), equalTo(1));
@@ -202,7 +206,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response);
 
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         assertThat(capturedSearchRequests.size(), equalTo(1));
@@ -258,7 +262,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response);
 
         assertThat(extractor.hasNext(), is(true));
-        assertThat(countMatches('{', asString(extractor.next().get())), equalTo(2400L));
+        assertThat(countMatches('{', asString(extractor.next().data().get())), equalTo(2400L));
         histogramBuckets = new ArrayList<>(buckets);
         for (int i = 0; i < buckets; i++) {
             histogramBuckets.add(

+ 10 - 6
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorTests.java

@@ -29,6 +29,8 @@ import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceB
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
+import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister;
 import org.junit.Before;
@@ -169,7 +171,9 @@ public class CompositeAggregationDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response);
 
         assertThat(extractor.hasNext(), is(true));
-        Optional<InputStream> stream = extractor.next();
+        DataExtractor.Result result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(1000L, 4000L)));
+        Optional<InputStream> stream = result.data();
         assertThat(stream.isPresent(), is(true));
         String expectedStream = """
             {"airline":"a","time":1999,"responsetime":11.0,"doc_count":1} \
@@ -202,7 +206,7 @@ public class CompositeAggregationDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response);
 
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         assertThat(capturedSearchRequests.size(), equalTo(1));
@@ -215,7 +219,7 @@ public class CompositeAggregationDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response);
 
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         assertThat(capturedSearchRequests.size(), equalTo(1));
@@ -259,7 +263,7 @@ public class CompositeAggregationDataExtractorTests extends ESTestCase {
         // We should have next right now as we have not yet determined if we have handled a page or not
         assertThat(extractor.hasNext(), is(true));
         // Should be empty
-        assertThat(countMatches('{', asString(extractor.next().get())), equalTo(0L));
+        assertThat(countMatches('{', asString(extractor.next().data().get())), equalTo(0L));
         // Determined that we were on the first page and ended
         assertThat(extractor.hasNext(), is(false));
     }
@@ -290,7 +294,7 @@ public class CompositeAggregationDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response);
 
         assertThat(extractor.hasNext(), is(true));
-        assertThat(countMatches('{', asString(extractor.next().get())), equalTo(10L));
+        assertThat(countMatches('{', asString(extractor.next().data().get())), equalTo(10L));
         buckets = new ArrayList<>(numBuckets);
         for (int i = 0; i < 6; i++) {
             buckets.add(
@@ -325,7 +329,7 @@ public class CompositeAggregationDataExtractorTests extends ESTestCase {
         assertThat(extractor.hasNext(), is(true));
         assertThat(extractor.isCancelled(), is(true));
         // Only the docs in the previous bucket before cancelling
-        assertThat(countMatches('{', asString(extractor.next().get())), equalTo(6L));
+        assertThat(countMatches('{', asString(extractor.next().data().get())), equalTo(6L));
 
         // Once we have handled the 6 remaining in that time bucket, we shouldn't finish the page and the extractor should end
         assertThat(extractor.hasNext(), is(false));

+ 83 - 60
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.metrics.Max;
 import org.elasticsearch.search.aggregations.metrics.Min;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister;
@@ -111,7 +112,7 @@ public class ChunkedDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(createSearchResponse(0L, 0L, 0L));
 
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
         Mockito.verifyNoMoreInteractions(dataExtractorFactory);
     }
@@ -125,20 +126,28 @@ public class ChunkedDataExtractorTests extends ESTestCase {
         InputStream inputStream2 = mock(InputStream.class);
         InputStream inputStream3 = mock(InputStream.class);
 
-        DataExtractor subExtactor1 = new StubSubExtractor(inputStream1, inputStream2);
+        DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(1000L, 2000L), inputStream1, inputStream2);
         when(dataExtractorFactory.newExtractor(1000L, 2000L)).thenReturn(subExtactor1);
 
-        DataExtractor subExtactor2 = new StubSubExtractor(inputStream3);
+        DataExtractor subExtactor2 = new StubSubExtractor(new SearchInterval(2000L, 2300L), inputStream3);
         when(dataExtractorFactory.newExtractor(2000L, 2300L)).thenReturn(subExtactor2);
 
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream1, extractor.next().get());
+        DataExtractor.Result result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(1000L, 2000L)));
+        assertEquals(inputStream1, result.data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream2, extractor.next().get());
+        result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(1000L, 2000L)));
+        assertEquals(inputStream2, result.data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream3, extractor.next().get());
+        result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(2000L, 2300L)));
+        assertEquals(inputStream3, result.data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(2000L, 2300L)));
+        assertThat(result.data().isPresent(), is(false));
 
         verify(dataExtractorFactory).newExtractor(1000L, 2000L);
         verify(dataExtractorFactory).newExtractor(2000L, 2300L);
@@ -175,20 +184,28 @@ public class ChunkedDataExtractorTests extends ESTestCase {
         InputStream inputStream2 = mock(InputStream.class);
         InputStream inputStream3 = mock(InputStream.class);
 
-        DataExtractor subExtactor1 = new StubSubExtractor(inputStream1, inputStream2);
+        DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(1000L, 2000L), inputStream1, inputStream2);
         when(dataExtractorFactory.newExtractor(1000L, 2000L)).thenReturn(subExtactor1);
 
-        DataExtractor subExtactor2 = new StubSubExtractor(inputStream3);
+        DataExtractor subExtactor2 = new StubSubExtractor(new SearchInterval(2000L, 2300L), inputStream3);
         when(dataExtractorFactory.newExtractor(2000L, 2300L)).thenReturn(subExtactor2);
 
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream1, extractor.next().get());
+        DataExtractor.Result result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(1000L, 2000L)));
+        assertEquals(inputStream1, result.data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream2, extractor.next().get());
+        result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(1000L, 2000L)));
+        assertEquals(inputStream2, result.data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream3, extractor.next().get());
+        result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(2000L, 2300L)));
+        assertEquals(inputStream3, result.data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(2000L, 2300L)));
+        assertThat(result.data().isPresent(), is(false));
 
         verify(dataExtractorFactory).newExtractor(1000L, 2000L);
         verify(dataExtractorFactory).newExtractor(2000L, 2300L);
@@ -226,17 +243,23 @@ public class ChunkedDataExtractorTests extends ESTestCase {
         InputStream inputStream2 = mock(InputStream.class);
 
         // 200 * 1_000 == 200_000
-        DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
+        DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(100_000L, 300_000L), inputStream1);
         when(dataExtractorFactory.newExtractor(100_000L, 300_000L)).thenReturn(subExtactor1);
 
-        DataExtractor subExtactor2 = new StubSubExtractor(inputStream2);
+        DataExtractor subExtactor2 = new StubSubExtractor(new SearchInterval(300_000L, 450_000L), inputStream2);
         when(dataExtractorFactory.newExtractor(300_000L, 450_000L)).thenReturn(subExtactor2);
 
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream1, extractor.next().get());
+        DataExtractor.Result result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(100_000L, 300_000L)));
+        assertEquals(inputStream1, result.data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream2, extractor.next().get());
-        assertThat(extractor.next().isPresent(), is(false));
+        result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(300_000L, 450_000L)));
+        assertEquals(inputStream2, result.data().get());
+        result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(300_000L, 450_000L)));
+        assertThat(result.data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         verify(dataExtractorFactory).newExtractor(100_000L, 300_000L);
@@ -252,7 +275,7 @@ public class ChunkedDataExtractorTests extends ESTestCase {
 
         extractor.setNextResponse(createNullSearchResponse());
 
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         Mockito.verifyNoMoreInteractions(dataExtractorFactory);
@@ -271,17 +294,17 @@ public class ChunkedDataExtractorTests extends ESTestCase {
         InputStream inputStream1 = mock(InputStream.class);
         InputStream inputStream2 = mock(InputStream.class);
 
-        DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
-        when(dataExtractorFactory.newExtractor(100000L, 300000L)).thenReturn(subExtactor1);
+        DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(100_000L, 300_000L), inputStream1);
+        when(dataExtractorFactory.newExtractor(100_000L, 300_000L)).thenReturn(subExtactor1);
 
-        DataExtractor subExtactor2 = new StubSubExtractor(inputStream2);
-        when(dataExtractorFactory.newExtractor(300000L, 450000L)).thenReturn(subExtactor2);
+        DataExtractor subExtactor2 = new StubSubExtractor(new SearchInterval(300_000L, 450_000L), inputStream2);
+        when(dataExtractorFactory.newExtractor(300_000L, 450_000L)).thenReturn(subExtactor2);
 
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream1, extractor.next().get());
+        assertEquals(inputStream1, extractor.next().data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream2, extractor.next().get());
-        assertThat(extractor.next().isPresent(), is(false));
+        assertEquals(inputStream2, extractor.next().data().get());
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         verify(dataExtractorFactory).newExtractor(100000L, 300000L);
@@ -302,16 +325,16 @@ public class ChunkedDataExtractorTests extends ESTestCase {
         InputStream inputStream1 = mock(InputStream.class);
         InputStream inputStream2 = mock(InputStream.class);
 
-        DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
+        DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(100_000L, 200_000L), inputStream1);
         when(dataExtractorFactory.newExtractor(100000L, 200000L)).thenReturn(subExtactor1);
 
-        DataExtractor subExtactor2 = new StubSubExtractor(inputStream2);
+        DataExtractor subExtactor2 = new StubSubExtractor(new SearchInterval(200_000L, 300_000L), inputStream2);
         when(dataExtractorFactory.newExtractor(200000L, 300000L)).thenReturn(subExtactor2);
 
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream1, extractor.next().get());
+        assertEquals(inputStream1, extractor.next().data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream2, extractor.next().get());
+        assertEquals(inputStream2, extractor.next().data().get());
         assertThat(extractor.hasNext(), is(true));
 
         verify(dataExtractorFactory).newExtractor(100000L, 200000L);
@@ -331,16 +354,16 @@ public class ChunkedDataExtractorTests extends ESTestCase {
         InputStream inputStream1 = mock(InputStream.class);
         InputStream inputStream2 = mock(InputStream.class);
 
-        DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
+        DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(100_000L, 160_000L), inputStream1);
         when(dataExtractorFactory.newExtractor(100000L, 160000L)).thenReturn(subExtactor1);
 
-        DataExtractor subExtactor2 = new StubSubExtractor(inputStream2);
+        DataExtractor subExtactor2 = new StubSubExtractor(new SearchInterval(160_000L, 220_000L), inputStream2);
         when(dataExtractorFactory.newExtractor(160000L, 220000L)).thenReturn(subExtactor2);
 
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream1, extractor.next().get());
+        assertEquals(inputStream1, extractor.next().data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream2, extractor.next().get());
+        assertEquals(inputStream2, extractor.next().data().get());
         assertThat(extractor.hasNext(), is(true));
 
         verify(dataExtractorFactory).newExtractor(100000L, 160000L);
@@ -359,13 +382,13 @@ public class ChunkedDataExtractorTests extends ESTestCase {
 
         InputStream inputStream1 = mock(InputStream.class);
 
-        DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
+        DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(300L, 500L), inputStream1);
         when(dataExtractorFactory.newExtractor(300L, 500L)).thenReturn(subExtactor1);
 
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream1, extractor.next().get());
+        assertEquals(inputStream1, extractor.next().data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         verify(dataExtractorFactory).newExtractor(300L, 500L);
@@ -384,13 +407,13 @@ public class ChunkedDataExtractorTests extends ESTestCase {
 
         InputStream inputStream1 = mock(InputStream.class);
 
-        DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
+        DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(1L, 10L), inputStream1);
         when(dataExtractorFactory.newExtractor(1L, 101L)).thenReturn(subExtactor1);
 
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream1, extractor.next().get());
+        assertEquals(inputStream1, extractor.next().data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         verify(dataExtractorFactory).newExtractor(1L, 101L);
@@ -409,15 +432,15 @@ public class ChunkedDataExtractorTests extends ESTestCase {
 
         InputStream inputStream1 = mock(InputStream.class);
 
-        DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
+        DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(100_000L, 200_000L), inputStream1);
         when(dataExtractorFactory.newExtractor(100000L, 200000L)).thenReturn(subExtactor1);
 
         // This one is empty
-        DataExtractor subExtactor2 = new StubSubExtractor();
+        DataExtractor subExtactor2 = new StubSubExtractor(new SearchInterval(200_000L, 300_000L));
         when(dataExtractorFactory.newExtractor(200000, 300000L)).thenReturn(subExtactor2);
 
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream1, extractor.next().get());
+        assertEquals(inputStream1, extractor.next().data().get());
         assertThat(extractor.hasNext(), is(true));
 
         // Now we have: 200K millis * 500 * 10 / 5K docs = 200000
@@ -425,11 +448,11 @@ public class ChunkedDataExtractorTests extends ESTestCase {
 
         // This is the last one
         InputStream inputStream2 = mock(InputStream.class);
-        DataExtractor subExtactor3 = new StubSubExtractor(inputStream2);
+        DataExtractor subExtactor3 = new StubSubExtractor(new SearchInterval(200_000L, 400_000L), inputStream2);
         when(dataExtractorFactory.newExtractor(200000, 400000)).thenReturn(subExtactor3);
 
-        assertEquals(inputStream2, extractor.next().get());
-        assertThat(extractor.next().isPresent(), is(false));
+        assertEquals(inputStream2, extractor.next().data().get());
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         verify(dataExtractorFactory).newExtractor(100000L, 200000L);
@@ -452,7 +475,7 @@ public class ChunkedDataExtractorTests extends ESTestCase {
 
         InputStream inputStream1 = mock(InputStream.class);
 
-        DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
+        DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(1000L, 2000L), inputStream1);
         when(dataExtractorFactory.newExtractor(1000L, 2000L)).thenReturn(subExtactor1);
 
         assertThat(extractor.hasNext(), is(true));
@@ -472,19 +495,19 @@ public class ChunkedDataExtractorTests extends ESTestCase {
         InputStream inputStream1 = mock(InputStream.class);
         InputStream inputStream2 = mock(InputStream.class);
 
-        DataExtractor subExtactor1 = new StubSubExtractor(inputStream1, inputStream2);
+        DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(1000L, 2000L), inputStream1, inputStream2);
         when(dataExtractorFactory.newExtractor(1000L, 2000L)).thenReturn(subExtactor1);
 
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream1, extractor.next().get());
+        assertEquals(inputStream1, extractor.next().data().get());
 
         extractor.cancel();
 
         assertThat(extractor.isCancelled(), is(true));
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream2, extractor.next().get());
+        assertEquals(inputStream2, extractor.next().data().get());
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         verify(dataExtractorFactory).newExtractor(1000L, 2000L);
@@ -498,17 +521,17 @@ public class ChunkedDataExtractorTests extends ESTestCase {
 
         InputStream inputStream1 = mock(InputStream.class);
 
-        DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
+        DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(1000L, 3000L), inputStream1);
         when(dataExtractorFactory.newExtractor(1000L, 2000L)).thenReturn(subExtactor1);
 
         assertThat(extractor.hasNext(), is(true));
-        assertEquals(inputStream1, extractor.next().get());
+        assertEquals(inputStream1, extractor.next().data().get());
 
         extractor.cancel();
 
         assertThat(extractor.isCancelled(), is(true));
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
 
         verify(dataExtractorFactory).newExtractor(1000L, 2000L);
@@ -597,12 +620,12 @@ public class ChunkedDataExtractorTests extends ESTestCase {
     }
 
     private static class StubSubExtractor implements DataExtractor {
+        final SearchInterval searchInterval;
         List<InputStream> streams = new ArrayList<>();
         boolean hasNext = true;
 
-        StubSubExtractor() {}
-
-        StubSubExtractor(InputStream... streams) {
+        StubSubExtractor(SearchInterval searchInterval, InputStream... streams) {
+            this.searchInterval = searchInterval;
             Collections.addAll(this.streams, streams);
         }
 
@@ -612,12 +635,12 @@ public class ChunkedDataExtractorTests extends ESTestCase {
         }
 
         @Override
-        public Optional<InputStream> next() {
+        public Result next() {
             if (streams.isEmpty()) {
                 hasNext = false;
-                return Optional.empty();
+                return new Result(searchInterval, Optional.empty());
             }
-            return Optional.of(streams.remove(0));
+            return new Result(searchInterval, Optional.of(streams.remove(0)));
         }
 
         @Override

+ 23 - 15
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java

@@ -35,6 +35,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
+import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
+import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister;
 import org.elasticsearch.xpack.ml.extractor.DocValueField;
@@ -182,7 +184,9 @@ public class ScrollDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response1);
 
         assertThat(extractor.hasNext(), is(true));
-        Optional<InputStream> stream = extractor.next();
+        DataExtractor.Result result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(1000L, 2000L)));
+        Optional<InputStream> stream = result.data();
         assertThat(stream.isPresent(), is(true));
         String expectedStream = "{\"time\":1100,\"field_1\":\"a1\"} {\"time\":1200,\"field_1\":\"a2\"}";
         assertThat(asString(stream.get()), equalTo(expectedStream));
@@ -190,7 +194,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
         SearchResponse response2 = createEmptySearchResponse();
         extractor.setNextResponse(response2);
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
         assertThat(capturedSearchRequests.size(), equalTo(1));
 
@@ -222,7 +226,9 @@ public class ScrollDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response1);
 
         assertThat(extractor.hasNext(), is(true));
-        Optional<InputStream> stream = extractor.next();
+        DataExtractor.Result result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(1000L, 10000L)));
+        Optional<InputStream> stream = result.data();
         assertThat(stream.isPresent(), is(true));
         String expectedStream = """
             {"time":1000,"field_1":"a1"} {"time":2000,"field_1":"a2"}""";
@@ -232,7 +238,9 @@ public class ScrollDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response2);
 
         assertThat(extractor.hasNext(), is(true));
-        stream = extractor.next();
+        result = extractor.next();
+        assertThat(result.searchInterval(), equalTo(new SearchInterval(1000L, 10000L)));
+        stream = result.data();
         assertThat(stream.isPresent(), is(true));
         expectedStream = """
             {"time":3000,"field_1":"a3"} {"time":4000,"field_1":"a4"}""";
@@ -241,7 +249,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
         SearchResponse response3 = createEmptySearchResponse();
         extractor.setNextResponse(response3);
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
         assertThat(capturedSearchRequests.size(), equalTo(1));
 
@@ -273,7 +281,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response1);
 
         assertThat(extractor.hasNext(), is(true));
-        Optional<InputStream> stream = extractor.next();
+        Optional<InputStream> stream = extractor.next().data();
         assertThat(stream.isPresent(), is(true));
         String expectedStream = """
             {"time":1000,"field_1":"a1"} {"time":2000,"field_1":"a2"}""";
@@ -290,7 +298,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
 
         assertThat(extractor.isCancelled(), is(true));
         assertThat(extractor.hasNext(), is(true));
-        stream = extractor.next();
+        stream = extractor.next().data();
         assertThat(stream.isPresent(), is(true));
         expectedStream = """
             {"time":2000,"field_1":"a3"} {"time":2000,"field_1":"a4"}""";
@@ -318,7 +326,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response1);
 
         assertThat(extractor.hasNext(), is(true));
-        Optional<InputStream> stream = extractor.next();
+        Optional<InputStream> stream = extractor.next().data();
         assertThat(stream.isPresent(), is(true));
 
         extractor.setNextResponseToError(new SearchPhaseExecutionException("search phase 1", "boom", ShardSearchFailure.EMPTY_ARRAY));
@@ -353,11 +361,11 @@ public class ScrollDataExtractorTests extends ESTestCase {
 
         // first response is good
         assertThat(extractor.hasNext(), is(true));
-        Optional<InputStream> output = extractor.next();
+        Optional<InputStream> output = extractor.next().data();
         assertThat(output.isPresent(), is(true));
         // this should recover from the first shard failure and try again
         assertThat(extractor.hasNext(), is(true));
-        output = extractor.next();
+        output = extractor.next().data();
         assertThat(output.isPresent(), is(true));
         // A second failure is not tolerated
         assertThat(extractor.hasNext(), is(true));
@@ -379,7 +387,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
         extractor.setNextResponseToError(new ElasticsearchException("something not search phase exception"));
         extractor.setNextResponseToError(new ElasticsearchException("something not search phase exception"));
 
-        Optional<InputStream> output = extractor.next();
+        Optional<InputStream> output = extractor.next().data();
         assertThat(output.isPresent(), is(true));
         assertEquals(1000L, extractor.getInitScrollStartTime());
 
@@ -409,11 +417,11 @@ public class ScrollDataExtractorTests extends ESTestCase {
 
         // first response is good
         assertThat(extractor.hasNext(), is(true));
-        Optional<InputStream> output = extractor.next();
+        Optional<InputStream> output = extractor.next().data();
         assertThat(output.isPresent(), is(true));
         // this should recover from the SearchPhaseExecutionException and try again
         assertThat(extractor.hasNext(), is(true));
-        output = extractor.next();
+        output = extractor.next().data();
         assertThat(output.isPresent(), is(true));
         assertEquals(Long.valueOf(1400L), extractor.getLastTimestamp());
         // A second failure is not tolerated
@@ -464,7 +472,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
         extractor.setNextResponse(response1);
 
         assertThat(extractor.hasNext(), is(true));
-        Optional<InputStream> stream = extractor.next();
+        Optional<InputStream> stream = extractor.next().data();
         assertThat(stream.isPresent(), is(true));
         String expectedStream = """
             {"time":1100,"field_1":"a1"} {"time":1200,"field_1":"a2"}""";
@@ -473,7 +481,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
         SearchResponse response2 = createEmptySearchResponse();
         extractor.setNextResponse(response2);
         assertThat(extractor.hasNext(), is(true));
-        assertThat(extractor.next().isPresent(), is(false));
+        assertThat(extractor.next().data().isPresent(), is(false));
         assertThat(extractor.hasNext(), is(false));
         assertThat(capturedSearchRequests.size(), equalTo(1));