Browse Source

Add fleet search api to wait on refreshes (#73134)

This is related to #71449. This commit adds a specialized search API
which allows users to pass wait on refresh checkpoints. When users pass
these checkpoints to the API, the search will only be executed after the
checkpoints are visible after a refresh.
Tim Brooks 4 years ago
parent
commit
7ad7d7eccd
28 changed files with 1451 additions and 123 deletions
  1. 45 0
      docs/reference/fleet/fleet-multi-search.asciidoc
  2. 67 0
      docs/reference/fleet/fleet-search.asciidoc
  3. 2 0
      docs/reference/fleet/index.asciidoc
  4. 45 0
      rest-api-spec/src/main/resources/rest-api-spec/api/fleet.msearch.json
  5. 50 0
      rest-api-spec/src/main/resources/rest-api-spec/api/fleet.search.json
  6. 19 0
      server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java
  7. 16 1
      server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java
  8. 23 2
      server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java
  9. 44 3
      server/src/main/java/org/elasticsearch/action/search/SearchRequest.java
  10. 9 0
      server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java
  11. 28 0
      server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
  12. 10 0
      server/src/main/java/org/elasticsearch/index/engine/Engine.java
  13. 6 0
      server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
  14. 10 0
      server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java
  15. 28 0
      server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  16. 222 52
      server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java
  17. 26 3
      server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java
  18. 17 2
      server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java
  19. 114 9
      server/src/main/java/org/elasticsearch/search/SearchService.java
  20. 68 3
      server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java
  21. 24 3
      server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  22. 166 43
      server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
  23. 88 0
      server/src/test/java/org/elasticsearch/search/SearchServiceTests.java
  24. 0 0
      x-pack/plugin/fleet/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/fleet/10_global_checkpoints.yml
  25. 110 0
      x-pack/plugin/fleet/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/fleet/20_wait_for_checkpoints.yml
  26. 3 2
      x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java
  27. 112 0
      x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/rest/RestFleetMultiSearchAction.java
  28. 99 0
      x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/rest/RestFleetSearchAction.java

+ 45 - 0
docs/reference/fleet/fleet-multi-search.asciidoc

@@ -0,0 +1,45 @@
+[role="xpack"]
+[[fleet-multi-search]]
+=== Fleet multi search API
+++++
+<titleabbrev>Fleet search</titleabbrev>
+++++
+
+Executes several <<fleet-search,fleet searches>> with a single API request.
+
+The API follows the same structure as the <<search-multi-search, multi search>> API. However,
+similar to the fleet search API, it supports the `wait_for_checkpoints` parameter.
+
+NOTE: The fleet multi search API is designed for indirect use through fleet server. Direct use is
+not supported. Elastic reserves the right to change or remove this feature in future releases
+without prior notice.
+
+[[fleet-multi-search-api-request]]
+==== {api-request-title}
+
+`GET /_fleet/_msearch`
+
+`GET /<index>/_fleet/_msearch`
+
+[[fleet-multi-search-api-path-params]]
+==== {api-path-parms-title}
+
+`<index>`::
+(Optional, string)
+A single index. Index aliases are not supported.
+
+[role="child_attributes"]
+[[fleet-multi-search-api-query-parms]]
+==== {api-query-parms-title}
+
+`wait_for_checkpoints`::
+(Optional, list) A comma separated list of checkpoints. When configured, the search API will
+only be executed on a shard after the relevant checkpoint has become visible for search.
+Defaults to an empty list which will cause Elasticsearch to immediately execute the search.
+
+`allow_partial_search_results`::
+(Optional, Boolean)
+If `true`, returns partial results if there are shard request timeouts or
+<<shard-failures,shard failures>>. If `false`, returns an error with
+no partial results. Defaults to the configured cluster setting `search.default_allow_partial_results` which
+is `true` by default.

+ 67 - 0
docs/reference/fleet/fleet-search.asciidoc

@@ -0,0 +1,67 @@
+[role="xpack"]
+[[fleet-search]]
+=== Fleet search API
+++++
+<titleabbrev>Fleet search</titleabbrev>
+++++
+
+The purpose of the fleet search api is to provide a search api where the search
+will only be executed after provided checkpoint has been processed and is visible
+for searches inside of Elasticsearch.
+
+NOTE: The fleet search API is designed for indirect use through fleet server. Direct use is
+not supported. Elastic reserves the right to change or remove this feature in future releases
+without prior notice.
+
+[discrete]
+[[wait-for-checkpoint-functionality]]
+== Wait for checkpoint functionality
+
+The fleet search API supports the optional parameter `wait_for_checkpoints`. This parameter
+is a list of sequence number checkpoints. When this parameter is present, the search will
+only be executed on local shards after the all operations up to and including the provided
+sequence number checkpoint are visible for search. Indexed operations become visible after a
+refresh. The checkpoints are indexed by shard.
+
+If a timeout occurs before the the checkpoint has been refreshed into Elasticsearch,
+the search request will timeout.
+
+The fleet search API only supports searches against a single index.
+
+[discrete]
+[[fleet-search-partial-responses]]
+== Allow partial results
+
+By default, the Elasticsearch search api will allow <<search-partial-responses,partial search results>>.
+With this fleet API, it is common to configure this to be `false` or to check in the response
+to ensure each shard search was successful. If these precautions are not taken, it is
+possible for search results to be successfully returned even if one or more shards
+timed out.
+
+[[fleet-search-api-request]]
+==== {api-request-title}
+
+`GET /<index>/_fleet/_search`
+
+[[fleet-search-api-path-params]]
+==== {api-path-parms-title}
+
+`<index>`::
+(Required, string)
+A single index. Index aliases are not supported.
+
+[role="child_attributes"]
+[[fleet-search-api-query-parms]]
+==== {api-query-parms-title}
+
+`wait_for_checkpoints`::
+(Optional, list) A comma separated list of checkpoints. When configured, the search API will
+only be executed on a shard after the relevant checkpoint has become visible for search.
+Defaults to an empty list which will cause Elasticsearch to immediately execute the search.
+
+`allow_partial_search_results`::
+(Optional, Boolean)
+If `true`, returns partial results if there are shard request timeouts or
+<<shard-failures,shard failures>>. If `false`, returns an error with
+no partial results. Defaults to the configured cluster setting `search.default_allow_partial_results` which
+is `true` by default.

+ 2 - 0
docs/reference/fleet/index.asciidoc

@@ -10,6 +10,8 @@ agent and action data. These APIs are experimental and for internal use by
 {fleet} only.
 
 * <<get-global-checkpoints,Get global checkpoints>>
+* <<fleet-search,Fleet search>>
 
 // top-level
 include::get-global-checkpoints.asciidoc[]
+include::fleet-search.asciidoc[]

+ 45 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/fleet.msearch.json

@@ -0,0 +1,45 @@
+{
+  "fleet.msearch":{
+    "documentation":{
+      "url": null,
+      "description": "Multi Search API where the search will only be executed after specified checkpoints are available due to a refresh. This API is designed for internal use by the fleet server project."
+    },
+    "stability":"experimental",
+    "visibility":"public",
+    "headers":{
+      "accept": [ "application/json"],
+      "content_type": ["application/x-ndjson"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/_fleet/_msearch",
+          "methods":[
+            "GET",
+            "POST"
+          ]
+        },
+        {
+          "path":"/{index}/_fleet/_msearch",
+          "methods":[
+            "GET",
+            "POST"
+          ],
+          "parts":{
+            "index":{
+              "type":"list",
+              "description":"A comma-separated list of index names to use as default"
+            }
+          }
+        }
+      ]
+    },
+    "params":{
+    },
+    "body":{
+      "description":"The request definitions (metadata-fleet search request definition pairs), separated by newlines",
+      "required":true,
+      "serialize":"bulk"
+    }
+  }
+}

+ 50 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/fleet.search.json

@@ -0,0 +1,50 @@
+{
+  "fleet.search":{
+    "documentation":{
+      "url": null,
+      "description": "Search API where the search will only be executed after specified checkpoints are available due to a refresh. This API is designed for internal use by the fleet server project."
+    },
+    "stability":"experimental",
+    "visibility":"public",
+    "headers":{
+      "accept": [ "application/json"],
+      "content_type": ["application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/{index}/_fleet/_search",
+          "methods":[
+            "GET",
+            "POST"
+          ],
+          "parts":{
+            "index":{
+              "type":"string",
+              "description":"The name of the index."
+            }
+          }
+        }
+      ]
+    },
+    "params":{
+      "wait_for_checkpoints":{
+        "type":"list",
+        "description":"Comma separated list of checkpoints, one per shard",
+        "default":""
+      },
+      "wait_for_checkpoints_timeout":{
+        "type":"time",
+        "description":"Explicit wait_for_checkpoints timeout"
+      },
+      "allow_partial_search_results":{
+        "type":"boolean",
+        "default":true,
+        "description":"Indicate if an error should be returned if there is a partial search failure or timeout"
+      }
+    },
+    "body":{
+      "description":"The search definition using the Query DSL"
+    }
+  }
+}

+ 19 - 0
server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java

@@ -254,6 +254,25 @@ public class TransportSearchIT extends ESIntegTestCase {
         }
     }
 
+    public void testWaitForRefreshIndexValidation() throws Exception {
+        int numberOfShards = randomIntBetween(3, 10);
+        assertAcked(prepareCreate("test1").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)));
+        client().admin().indices().prepareAliases().addAlias("test1", "testAlias").get();
+
+        // no exception
+        client().prepareSearch("testAlias").get();
+
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
+            () -> client().prepareSearch("testAlias").setWaitForCheckpoints(Collections.singletonMap("testAlias", new long[0])).get());
+        assertThat(e.getMessage(), containsString("Index configured with wait_for_checkpoints must be a concrete index resolved in this " +
+            "search. Index [testAlias] is not a concrete index resolved in this search."));
+
+        IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class,
+            () -> client().prepareSearch("test1").setWaitForCheckpoints(Collections.singletonMap("test1", new long[2])).get());
+        assertThat(e2.getMessage(), containsString("Index configured with wait_for_checkpoints must search the same number of shards as " +
+            "checkpoints provided. [2] checkpoints provided. Index [test1] has [" + numberOfShards + "] shards."));
+    }
+
     public void testShardCountLimit() throws Exception {
         try {
             final int numPrimaries1 = randomIntBetween(2, 10);

+ 16 - 1
server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

@@ -27,6 +27,8 @@ import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.search.SearchContextMissingException;
 import org.elasticsearch.search.SearchPhaseResult;
@@ -63,6 +65,7 @@ import java.util.stream.Collectors;
  */
 abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase implements SearchPhaseContext {
     private static final float DEFAULT_INDEX_BOOST = 1.0f;
+    private static final long[] EMPTY_LONG_ARRAY = new long[0];
     private final Logger logger;
     private final SearchTransportService searchTransportService;
     private final Executor executor;
@@ -733,9 +736,21 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
         AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
         assert filter != null;
         float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
+        final Map<String, long[]> indexToWaitForCheckpoints = request.getWaitForCheckpoints();
+        final TimeValue waitForCheckpointsTimeout = request.getWaitForCheckpointsTimeout();
+        final long[] waitForCheckpoints = indexToWaitForCheckpoints.getOrDefault(shardIt.shardId().getIndex().getName(), EMPTY_LONG_ARRAY);
+
+        long waitForCheckpoint;
+        if (waitForCheckpoints.length == 0) {
+            waitForCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
+        } else {
+            assert waitForCheckpoints.length > shardIndex;
+            waitForCheckpoint = waitForCheckpoints[shardIndex];
+        }
         ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request,
             shardIt.shardId(), shardIndex, getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(),
-            shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive());
+            shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive(), waitForCheckpoint,
+            waitForCheckpointsTimeout);
         // if we already received a search result we can inform the shard that it
         // can return a null response if the request rewrites to match none rather
         // than creating an empty response in the search thread pool.

+ 23 - 2
server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java

@@ -14,12 +14,13 @@ import org.elasticsearch.action.CompositeIndicesRequest;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.IndicesOptions.WildcardStates;
 import org.elasticsearch.common.CheckedBiConsumer;
-import org.elasticsearch.core.RestApiVersion;
+import org.elasticsearch.common.TriFunction;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
+import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContent;
@@ -179,7 +180,25 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
                                            String searchType,
                                            Boolean ccsMinimizeRoundtrips,
                                            NamedXContentRegistry registry,
-                                           boolean allowExplicitIndex, RestApiVersion restApiVersion) throws IOException {
+                                           boolean allowExplicitIndex,
+                                           RestApiVersion restApiVersion) throws IOException {
+        readMultiLineFormat(data, xContent, consumer, indices, indicesOptions, routing, searchType, ccsMinimizeRoundtrips, registry,
+            allowExplicitIndex, restApiVersion, (s, o, r) -> false);
+
+    }
+
+    public static void readMultiLineFormat(BytesReference data,
+                                           XContent xContent,
+                                           CheckedBiConsumer<SearchRequest, XContentParser, IOException> consumer,
+                                           String[] indices,
+                                           IndicesOptions indicesOptions,
+                                           String routing,
+                                           String searchType,
+                                           Boolean ccsMinimizeRoundtrips,
+                                           NamedXContentRegistry registry,
+                                           boolean allowExplicitIndex,
+                                           RestApiVersion restApiVersion,
+                                           TriFunction<String, Object, SearchRequest, Boolean> extraParamParser) throws IOException {
         int from = 0;
         byte marker = xContent.streamSeparator();
         while (true) {
@@ -251,6 +270,8 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
                         } else if(restApiVersion == RestApiVersion.V_7 &&
                             ("type".equals(entry.getKey()) || "types".equals(entry.getKey()))) {
                             deprecationLogger.compatibleCritical("msearch_with_types", RestMultiSearchAction.TYPES_DEPRECATION_MESSAGE);
+                        } else if (extraParamParser.apply(entry.getKey(), value, searchRequest)) {
+                            // Skip, the parser handled the key/value
                         } else {
                             throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section");
                         }

+ 44 - 3
server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

@@ -13,12 +13,11 @@ import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.index.query.QueryRewriteContext;
 import org.elasticsearch.index.query.Rewriteable;
 import org.elasticsearch.search.Scroll;
@@ -26,10 +25,11 @@ import org.elasticsearch.search.builder.PointInTimeBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.search.sort.FieldSortBuilder;
-import org.elasticsearch.search.sort.SortBuilder;
 import org.elasticsearch.search.sort.ShardDocSortField;
+import org.elasticsearch.search.sort.SortBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.xcontent.ToXContent;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -99,6 +99,10 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
 
     private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
 
+    private Map<String, long[]> waitForCheckpoints = Collections.emptyMap();
+
+    private TimeValue waitForCheckpointsTimeout = TimeValue.timeValueSeconds(30);
+
     public SearchRequest() {
         this((Version) null);
     }
@@ -191,6 +195,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
         this.absoluteStartMillis = absoluteStartMillis;
         this.finalReduce = finalReduce;
         this.minCompatibleShardNode = searchRequest.minCompatibleShardNode;
+        this.waitForCheckpoints = searchRequest.waitForCheckpoints;
+        this.waitForCheckpointsTimeout = searchRequest.waitForCheckpointsTimeout;
     }
 
     /**
@@ -235,6 +241,11 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
         } else {
             minCompatibleShardNode = null;
         }
+        // TODO: Change after backport
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            waitForCheckpoints = in.readMap(StreamInput::readString, StreamInput::readLongArray);
+            waitForCheckpointsTimeout = in.readTimeValue();
+        }
     }
 
     @Override
@@ -268,6 +279,15 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
                 Version.writeVersion(minCompatibleShardNode, out);
             }
         }
+        // TODO: Change after backport
+        Version waitForCheckpointsVersion = Version.V_8_0_0;
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeMap(waitForCheckpoints, StreamOutput::writeString, StreamOutput::writeLongArray);
+            out.writeTimeValue(waitForCheckpointsTimeout);
+        } else if (waitForCheckpoints.isEmpty() == false) {
+            throw new IllegalArgumentException("Remote node version [" + out.getVersion() + " incompatible with " +
+                "wait_for_checkpoints. All nodes must be version [" + waitForCheckpointsVersion + "] or greater.");
+        }
     }
 
     @Override
@@ -321,6 +341,10 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
                     + "shard version", validationException);
             }
         }
+        if (pointInTimeBuilder() != null && waitForCheckpoints.isEmpty() == false) {
+            validationException = addValidationError("using [point in time] is not allowed with wait_for_checkpoints", validationException);
+
+        }
         return validationException;
     }
 
@@ -607,6 +631,23 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
         }
         this.maxConcurrentShardRequests = maxConcurrentShardRequests;
     }
+
+    public Map<String, long[]> getWaitForCheckpoints() {
+        return waitForCheckpoints;
+    }
+
+    public void setWaitForCheckpoints(Map<String, long[]> afterCheckpointsRefreshed) {
+        this.waitForCheckpoints = afterCheckpointsRefreshed;
+    }
+
+    public TimeValue getWaitForCheckpointsTimeout() {
+        return waitForCheckpointsTimeout;
+    }
+
+    public void setWaitForCheckpointsTimeout(final TimeValue waitForCheckpointsTimeout) {
+        this.waitForCheckpointsTimeout = waitForCheckpointsTimeout;
+    }
+
     /**
      * Sets a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
      * the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for

+ 9 - 0
server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java

@@ -134,6 +134,15 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
         return this;
     }
 
+    /**
+     * Wait for checkpoints configured for a concrete index, will require that the search request only
+     * be executed after the checkpoints are available for search due to a refresh.
+     */
+    public SearchRequestBuilder setWaitForCheckpoints(Map<String, long[]> waitForCheckpoints) {
+        request.setWaitForCheckpoints(waitForCheckpoints);
+        return this;
+    }
+
     /**
      * Specifies what type of requested indices to ignore and wildcard indices expressions.
      * <p>

+ 28 - 0
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -707,6 +707,14 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
 
         failIfOverShardCountLimit(clusterService, shardIterators.size());
 
+        if (searchRequest.getWaitForCheckpoints().isEmpty() == false) {
+            if (remoteShardIterators.isEmpty() == false) {
+                throw new IllegalArgumentException("Cannot use wait_for_checkpoints parameter with cross-cluster searches.");
+            } else {
+                validateWaitForCheckpoint(clusterState, searchRequest, concreteLocalIndices);
+            }
+        }
+
         Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);
 
         // optimize search type for cases where there is only one shard group to search on
@@ -881,6 +889,26 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         }
     }
 
+    private static void validateWaitForCheckpoint(ClusterState clusterState, SearchRequest searchRequest, String[] concreteLocalIndices) {
+        HashSet<String> searchedIndices = new HashSet<>(Arrays.asList(concreteLocalIndices));
+        for (Map.Entry<String, long[]> waitForCheckpointIndex : searchRequest.getWaitForCheckpoints().entrySet()) {
+            int checkpointsProvided = waitForCheckpointIndex.getValue().length;
+            String index = waitForCheckpointIndex.getKey();
+            IndexMetadata indexMetadata = clusterState.metadata().index(index);
+            if (searchedIndices.contains(index) == false) {
+                throw new IllegalArgumentException("Index configured with wait_for_checkpoints must be a concrete index resolved in " +
+                    "this search. Index [" + index + "] is not a concrete index resolved in this search.");
+            } else if (indexMetadata == null) {
+                throw new IllegalArgumentException("Cannot find index configured for wait_for_checkpoints parameter [" + index + "].");
+            } else if (indexMetadata.getNumberOfShards() != checkpointsProvided) {
+                throw new IllegalArgumentException("Index configured with wait_for_checkpoints must search the same number of shards as " +
+                    "checkpoints provided. [" + checkpointsProvided + "] checkpoints provided. Index [" + index + "] has " +
+                    "["  + indexMetadata.getNumberOfShards() + "] shards.");
+
+            }
+        }
+    }
+
     private static void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {
         final long shardCountLimit = clusterService.getClusterSettings().get(SHARD_COUNT_LIMIT_SETTING);
         if (shardCount > shardCountLimit) {

+ 10 - 0
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -745,6 +745,16 @@ public abstract class Engine implements Closeable {
         return new CommitStats(getLastCommittedSegmentInfos());
     }
 
+    /**
+     * @return the max issued or seen seqNo for this Engine
+     */
+    public abstract long getMaxSeqNo();
+
+    /**
+     * @return the processed local checkpoint for this Engine
+     */
+    public abstract long getProcessedLocalCheckpoint();
+
     /**
      * @return the persisted local checkpoint for this Engine
      */

+ 6 - 0
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -2438,6 +2438,12 @@ public class InternalEngine extends Engine {
         return getTranslog().getLastSyncedGlobalCheckpoint();
     }
 
+    @Override
+    public long getMaxSeqNo() {
+        return localCheckpointTracker.getMaxSeqNo();
+    }
+
+    @Override
     public long getProcessedLocalCheckpoint() {
         return localCheckpointTracker.getProcessedCheckpoint();
     }

+ 10 - 0
server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

@@ -357,6 +357,16 @@ public class ReadOnlyEngine extends Engine {
         return new Translog.Location(0,0,0);
     }
 
+    @Override
+    public long getMaxSeqNo() {
+        return seqNoStats.getMaxSeqNo();
+    }
+
+    @Override
+    public long getProcessedLocalCheckpoint() {
+        return seqNoStats.getLocalCheckpoint();
+    }
+
     @Override
     public long getPersistedLocalCheckpoint() {
         return seqNoStats.getLocalCheckpoint();

+ 28 - 0
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1741,6 +1741,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     private void onNewEngine(Engine newEngine) {
         assert Thread.holdsLock(engineMutex);
         refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
+        refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint);
+        refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo);
     }
 
     /**
@@ -3550,6 +3552,32 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         }
     }
 
+    /**
+     * Add a listener for refreshes.
+     *
+     * @param checkpoint the seqNo checkpoint to listen for
+     * @param listener for the refresh.
+     */
+    public void addRefreshListener(long checkpoint, ActionListener<Void> listener) {
+        final boolean readAllowed;
+        if (isReadAllowed()) {
+            readAllowed = true;
+        } else {
+            // check again under postRecoveryMutex. this is important to create a happens before relationship
+            // between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond
+            // to a listener before a refresh actually happened that contained that operation.
+            synchronized (postRecoveryMutex) {
+                readAllowed = isReadAllowed();
+            }
+        }
+        if (readAllowed) {
+            refreshListeners.addOrNotify(checkpoint, listener);
+        } else {
+            // we're not yet ready for reads, fail to notify client
+            listener.onFailure(new IllegalIndexShardStateException(shardId, state, "Read not allowed on IndexShard"));
+        }
+    }
+
     private static class RefreshMetricUpdater implements ReferenceManager.RefreshListener {
 
         private final MeanMetric refreshMetric;

+ 222 - 52
server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java

@@ -10,11 +10,15 @@ package org.elasticsearch.index.shard;
 
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.search.ReferenceManager;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.common.metrics.MeanMetric;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.translog.Translog;
 
 import java.io.Closeable;
@@ -23,6 +27,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Consumer;
 import java.util.function.IntSupplier;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
@@ -63,12 +68,16 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
      *
      * We never set this to non-null while closed it {@code true}.
      */
-    private volatile List<Tuple<Translog.Location, Consumer<Boolean>>> refreshListeners = null;
+    private volatile List<Tuple<Translog.Location, Consumer<Boolean>>> locationRefreshListeners = null;
+    private volatile List<Tuple<Long, ActionListener<Void>>> checkpointRefreshListeners = null;
+
     /**
      * The translog location that was last made visible by a refresh.
      */
     private volatile Translog.Location lastRefreshedLocation;
 
+    private volatile long lastRefreshedCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
+
     public RefreshListeners(
         final IntSupplier getMaxRefreshListeners,
         final Runnable forceRefresh,
@@ -105,7 +114,8 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
                 throw e;
             }
         }
-        assert refreshListeners == null;
+        assert locationRefreshListeners == null;
+        assert checkpointRefreshListeners == null;
         return releaseOnce;
     }
 
@@ -131,9 +141,9 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
             if (closed) {
                 throw new IllegalStateException("can't wait for refresh on a closed index");
             }
-            List<Tuple<Translog.Location, Consumer<Boolean>>> listeners = refreshListeners;
+            List<Tuple<Translog.Location, Consumer<Boolean>>> listeners = locationRefreshListeners;
             final int maxRefreshes = getMaxRefreshListeners.getAsInt();
-            if (refreshForcers == 0 && maxRefreshes > 0 && (listeners == null || listeners.size() < maxRefreshes)) {
+            if (refreshForcers == 0 && roomForListener(maxRefreshes, listeners, checkpointRefreshListeners)) {
                 ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true);
                 Consumer<Boolean> contextPreservingListener = forced -> {
                     try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
@@ -146,7 +156,7 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
                 }
                 // We have a free slot so register the listener
                 listeners.add(new Tuple<>(location, contextPreservingListener));
-                refreshListeners = listeners;
+                locationRefreshListeners = listeners;
                 return false;
             }
         }
@@ -156,16 +166,74 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
         return true;
     }
 
+    /**
+     * Add a listener for refreshes, calling it immediately if the location is already visible. If this runs out of listener slots then it
+     * fails the listener immediately. The checkpoint cannot be greater than the processed local checkpoint. This method does not respect
+     * the forceRefreshes state. It will NEVER force a refresh on the calling thread. Instead, it will simply add listeners or rejected
+     * them if too many listeners are already waiting.
+     *
+     * @param checkpoint the seqNo checkpoint to listen for
+     * @param listener for the refresh.
+     * @return did we call the listener (true) or register the listener to call later (false)?
+     */
+    public boolean addOrNotify(long checkpoint, ActionListener<Void> listener) {
+        assert checkpoint >= SequenceNumbers.NO_OPS_PERFORMED;
+        if (checkpoint <= lastRefreshedCheckpoint) {
+            listener.onResponse(null);
+            return true;
+        }
+        long maxIssuedSequenceNumber = maxIssuedSeqNoSupplier.getAsLong();
+        if (checkpoint > maxIssuedSequenceNumber) {
+            IllegalArgumentException e = new IllegalArgumentException("Cannot wait for unissued seqNo checkpoint [wait_for_checkpoint="
+                + checkpoint + ", max_issued_seqNo=" + maxIssuedSequenceNumber + "]");
+            listener.onFailure(e);
+            return true;
+        }
+
+        synchronized (this) {
+            if (closed) {
+                listener.onFailure(new IllegalStateException("can't wait for refresh on a closed index"));
+                return true;
+            }
+            List<Tuple<Long, ActionListener<Void>>> listeners = checkpointRefreshListeners;
+            final int maxRefreshes = getMaxRefreshListeners.getAsInt();
+            if (roomForListener(maxRefreshes, locationRefreshListeners, listeners)) {
+                addCheckpointListener(checkpoint, listener, listeners);
+                return false;
+            }
+        }
+        // No free slot so fail the listener
+        listener.onFailure(new IllegalStateException("Too many listeners waiting on refresh, wait listener rejected."));
+        return true;
+    }
+
+    private void addCheckpointListener(long checkpoint, ActionListener<Void> listener, List<Tuple<Long, ActionListener<Void>>> listeners) {
+        assert Thread.holdsLock(this);
+        ActionListener<Void> contextPreservingListener =
+            ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
+
+        if (listeners == null) {
+            listeners = new ArrayList<>();
+        }
+        // We have a free slot so register the listener
+        listeners.add(new Tuple<>(checkpoint, contextPreservingListener));
+        checkpointRefreshListeners = listeners;
+    }
+
     @Override
     public void close() throws IOException {
-        List<Tuple<Translog.Location, Consumer<Boolean>>> oldListeners;
+        List<Tuple<Translog.Location, Consumer<Boolean>>> oldLocationListeners;
+        List<Tuple<Long, ActionListener<Void>>> oldCheckpointListeners;
         synchronized (this) {
-            oldListeners = refreshListeners;
-            refreshListeners = null;
+            oldLocationListeners = locationRefreshListeners;
+            locationRefreshListeners = null;
+            oldCheckpointListeners = checkpointRefreshListeners;
+            checkpointRefreshListeners = null;
             closed = true;
         }
         // Fire any listeners we might have had
-        fireListeners(oldListeners);
+        fireListeners(oldLocationListeners);
+        failCheckpointListeners(oldCheckpointListeners, new AlreadyClosedException("shard is closed"));
     }
 
     /**
@@ -173,17 +241,17 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
      */
     public boolean refreshNeeded() {
         // A null list doesn't need a refresh. If we're closed we don't need a refresh either.
-        return refreshListeners != null && false == closed;
+        return (locationRefreshListeners != null || checkpointRefreshListeners != null) && false == closed;
     }
 
     /**
-     * The number of pending listeners.
+     * The total number of pending listeners.
      */
-    public int pendingCount() {
-        // No need to synchronize here because we're doing a single volatile read
-        List<Tuple<Translog.Location, Consumer<Boolean>>> listeners = refreshListeners;
-        // A null list means we haven't accumulated any listeners. Otherwise we need the size.
-        return listeners == null ? 0 : listeners.size();
+    public synchronized int pendingCount() {
+        List<Tuple<Translog.Location, Consumer<Boolean>>> locationListeners = locationRefreshListeners;
+        List<Tuple<Long, ActionListener<Void>>> checkpointListeners = checkpointRefreshListeners;
+        // A null list means we haven't accumulated any listeners. Otherwise, we need the size.
+        return (locationListeners == null ? 0 : locationListeners.size()) + (checkpointListeners == null ? 0 : checkpointListeners.size());
     }
 
     /**
@@ -193,6 +261,20 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
         this.currentRefreshLocationSupplier = currentRefreshLocationSupplier;
     }
 
+    /**
+     * Setup the engine used to find the last processed sequence number checkpoint.
+     */
+    public void setCurrentProcessedCheckpointSupplier(LongSupplier processedCheckpointSupplier) {
+        this.processedCheckpointSupplier = processedCheckpointSupplier;
+    }
+
+    /**
+     * Setup the engine used to find the max issued seqNo.
+     */
+    public void setMaxIssuedSeqNoSupplier(LongSupplier maxIssuedSeqNoSupplier) {
+        this.maxIssuedSeqNoSupplier = maxIssuedSeqNoSupplier;
+    }
+
     /**
      * Snapshot of the translog location before the current refresh if there is a refresh going on or null. Doesn't have to be volatile
      * because when it is used by the refreshing thread.
@@ -200,9 +282,18 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
     private Translog.Location currentRefreshLocation;
     private Supplier<Translog.Location> currentRefreshLocationSupplier;
 
+    /**
+     * Snapshot of the local processed checkpoint before the current refresh if there is a refresh going on or null. Doesn't have to be
+     * volatile because it is only used by the refreshing thread.
+     */
+    private long currentRefreshCheckpoint;
+    private LongSupplier processedCheckpointSupplier;
+    private LongSupplier maxIssuedSeqNoSupplier;
+
     @Override
     public void beforeRefresh() throws IOException {
         currentRefreshLocation = currentRefreshLocationSupplier.get();
+        currentRefreshCheckpoint = processedCheckpointSupplier.getAsLong();
         currentRefreshStartTime = System.nanoTime();
     }
 
@@ -211,14 +302,6 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
         // Increment refresh metric before communicating to listeners.
         refreshMetric.inc(System.nanoTime() - currentRefreshStartTime);
 
-        /* We intentionally ignore didRefresh here because our timing is a little off. It'd be a useful flag if we knew everything that made
-         * it into the refresh, but the way we snapshot the translog position before the refresh, things can sneak into the refresh that we
-         * don't know about. */
-        if (null == currentRefreshLocation) {
-            /* The translog had an empty last write location at the start of the refresh so we can't alert anyone to anything. This
-             * usually happens during recovery. The next refresh cycle out to pick up this refresh. */
-            return;
-        }
         /* Set the lastRefreshedLocation so listeners that come in for locations before that will just execute inline without messing
          * around with refreshListeners or synchronizing at all. Note that it is not safe for us to abort early if we haven't advanced the
          * position here because we set and read lastRefreshedLocation outside of a synchronized block. We do that so that waiting for a
@@ -227,58 +310,105 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
          * assignment into the synchronized block below and double checking lastRefreshedLocation in addOrNotify's synchronized block but
          * that doesn't seem worth it given that we already skip this process early if there aren't any listeners to iterate. */
         lastRefreshedLocation = currentRefreshLocation;
+        lastRefreshedCheckpoint = currentRefreshCheckpoint;
         /* Grab the current refresh listeners and replace them with null while synchronized. Any listeners that come in after this won't be
          * in the list we iterate over and very likely won't be candidates for refresh anyway because we've already moved the
          * lastRefreshedLocation. */
-        List<Tuple<Translog.Location, Consumer<Boolean>>> candidates;
+        List<Tuple<Translog.Location, Consumer<Boolean>>> locationCandidates;
+        List<Tuple<Long, ActionListener<Void>>> checkpointCandidates;
         synchronized (this) {
-            candidates = refreshListeners;
+            locationCandidates = locationRefreshListeners;
+            checkpointCandidates = checkpointRefreshListeners;
             // No listeners to check so just bail early
-            if (candidates == null) {
+            if (locationCandidates == null && checkpointCandidates == null) {
                 return;
             }
-            refreshListeners = null;
+            locationRefreshListeners = null;
+            checkpointRefreshListeners = null;
         }
-        // Iterate the list of listeners, copying the listeners to fire to one list and those to preserve to another list.
-        List<Tuple<Translog.Location, Consumer<Boolean>>> listenersToFire = null;
-        List<Tuple<Translog.Location, Consumer<Boolean>>> preservedListeners = null;
-        for (Tuple<Translog.Location, Consumer<Boolean>> tuple : candidates) {
-            Translog.Location location = tuple.v1();
-            if (location.compareTo(currentRefreshLocation) <= 0) {
-                if (listenersToFire == null) {
-                    listenersToFire = new ArrayList<>();
+        // Iterate the list of location listeners, copying the listeners to fire to one list and those to preserve to another list.
+        List<Tuple<Translog.Location, Consumer<Boolean>>> locationListenersToFire = null;
+        List<Tuple<Translog.Location, Consumer<Boolean>>> preservedLocationListeners = null;
+        if (locationCandidates != null) {
+            for (Tuple<Translog.Location, Consumer<Boolean>> tuple : locationCandidates) {
+                Translog.Location location = tuple.v1();
+                if (location.compareTo(currentRefreshLocation) <= 0) {
+                    if (locationListenersToFire == null) {
+                        locationListenersToFire = new ArrayList<>();
+                    }
+                    locationListenersToFire.add(tuple);
+                } else {
+                    if (preservedLocationListeners == null) {
+                        preservedLocationListeners = new ArrayList<>();
+                    }
+                    preservedLocationListeners.add(tuple);
                 }
-                listenersToFire.add(tuple);
-            } else {
-                if (preservedListeners == null) {
-                    preservedListeners = new ArrayList<>();
+            }
+        }
+
+        // Iterate the list of checkpoint listeners, copying the listeners to fire to one list and those to preserve to another list.
+        List<Tuple<Long, ActionListener<Void>>> checkpointListenersToFire = null;
+        List<Tuple<Long, ActionListener<Void>>> preservedCheckpointListeners = null;
+        if (checkpointCandidates != null) {
+            for (Tuple<Long, ActionListener<Void>> tuple : checkpointCandidates) {
+                long checkpoint = tuple.v1();
+                if (checkpoint <= currentRefreshCheckpoint) {
+                    if (checkpointListenersToFire == null) {
+                        checkpointListenersToFire = new ArrayList<>();
+                    }
+                    checkpointListenersToFire.add(tuple);
+                } else {
+                    if (preservedCheckpointListeners == null) {
+                        preservedCheckpointListeners = new ArrayList<>();
+                    }
+                    preservedCheckpointListeners.add(tuple);
                 }
-                preservedListeners.add(tuple);
             }
         }
+
         /* Now deal with the listeners that it isn't time yet to fire. We need to do this under lock so we don't miss a concurrent close or
          * newly registered listener. If we're not closed we just add the listeners to the list of listeners we check next time. If we are
          * closed we fire the listeners even though it isn't time for them. */
-        if (preservedListeners != null) {
+        List<Tuple<Long, ActionListener<Void>>> checkpointListenersToFail = null;
+        if (preservedLocationListeners != null || preservedCheckpointListeners != null) {
             synchronized (this) {
-                if (refreshListeners == null) {
-                    if (closed) {
-                        listenersToFire.addAll(preservedListeners);
+                if (preservedLocationListeners != null) {
+                    if (locationRefreshListeners == null) {
+                        if (closed) {
+                            if (locationListenersToFire == null) {
+                                locationListenersToFire = new ArrayList<>();
+                            }
+                            locationListenersToFire.addAll(preservedLocationListeners);
+                        } else {
+                            locationRefreshListeners = preservedLocationListeners;
+                        }
                     } else {
-                        refreshListeners = preservedListeners;
+                        assert closed == false : "Can't be closed and have non-null refreshListeners";
+                        locationRefreshListeners.addAll(preservedLocationListeners);
+                    }
+                }
+                if (preservedCheckpointListeners != null) {
+                    if (checkpointRefreshListeners == null) {
+                        if (closed) {
+                            checkpointListenersToFail = new ArrayList<>(preservedCheckpointListeners);
+                        } else {
+                            checkpointRefreshListeners = preservedCheckpointListeners;
+                        }
+                    } else {
+                        assert closed == false : "Can't be closed and have non-null refreshListeners";
+                        checkpointRefreshListeners.addAll(preservedCheckpointListeners);
                     }
-                } else {
-                    assert closed == false : "Can't be closed and have non-null refreshListeners";
-                    refreshListeners.addAll(preservedListeners);
                 }
             }
         }
         // Lastly, fire the listeners that are ready
-        fireListeners(listenersToFire);
+        fireListeners(locationListenersToFire);
+        fireCheckpointListeners(checkpointListenersToFire);
+        failCheckpointListeners(checkpointListenersToFail, new AlreadyClosedException("shard is closed"));
     }
 
     /**
-     * Fire some listeners. Does nothing if the list of listeners is null.
+     * Fire location listeners. Does nothing if the list of listeners is null.
      */
     private void fireListeners(final List<Tuple<Translog.Location, Consumer<Boolean>>> listenersToFire) {
         if (listenersToFire != null) {
@@ -286,7 +416,47 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
                 try {
                     listener.v2().accept(false);
                 } catch (final Exception e) {
-                    logger.warn("error firing refresh listener", e);
+                    logger.warn("error firing location refresh listener", e);
+                }
+            }
+        }
+    }
+
+    private static boolean roomForListener(final int maxRefreshes,
+                                           final List<Tuple<Translog.Location, Consumer<Boolean>>> locationListeners,
+                                           final List<Tuple<Long, ActionListener<Void>>> checkpointListeners) {
+        final int locationListenerCount = locationListeners == null ? 0 : locationListeners.size();
+        final int checkpointListenerCount = checkpointListeners == null ? 0 : checkpointListeners.size();
+        return (locationListenerCount + checkpointListenerCount) < maxRefreshes;
+    }
+
+    /**
+     * Fire checkpoint listeners. Does nothing if the list of listeners is null.
+     */
+    private void fireCheckpointListeners(final List<Tuple<Long, ActionListener<Void>>> listenersToFire) {
+        if (listenersToFire != null) {
+            for (final Tuple<Long, ActionListener<Void>> listener : listenersToFire) {
+                try {
+                    listener.v2().onResponse(null);
+                } catch (final Exception e) {
+                    logger.warn("error firing checkpoint refresh listener", e);
+                    assert false;
+                }
+            }
+        }
+    }
+
+    /**
+     * Fail checkpoint listeners. Does nothing if the list of listeners is null.
+     */
+    private void failCheckpointListeners(final List<Tuple<Long, ActionListener<Void>>> listenersToFire, Exception exception) {
+        if (listenersToFire != null) {
+            for (final Tuple<Long, ActionListener<Void>> listener : listenersToFire) {
+                try {
+                    listener.v2().onFailure(exception);
+                } catch (final Exception e) {
+                    logger.warn("error firing checkpoint refresh listener", e);
+                    assert false;
                 }
             }
         }

+ 26 - 3
server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java

@@ -14,6 +14,7 @@ import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.CheckedBiConsumer;
+import org.elasticsearch.common.TriFunction;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -96,6 +97,17 @@ public class RestMultiSearchAction extends BaseRestHandler {
     public static MultiSearchRequest parseRequest(RestRequest restRequest,
                                                   NamedWriteableRegistry namedWriteableRegistry,
                                                   boolean allowExplicitIndex) throws IOException {
+        return parseRequest(restRequest, namedWriteableRegistry, allowExplicitIndex, (k, v, r) -> false);
+    }
+
+    /**
+     * Parses a {@link RestRequest} body and returns a {@link MultiSearchRequest}. This variation allows the caller to specify if
+     * wait_for_checkpoints functionality is supported.
+     */
+    public static MultiSearchRequest parseRequest(RestRequest restRequest,
+                                                  NamedWriteableRegistry namedWriteableRegistry,
+                                                  boolean allowExplicitIndex,
+                                                  TriFunction<String, Object, SearchRequest, Boolean> extraParamParser) throws IOException {
         if(restRequest.getRestApiVersion() == RestApiVersion.V_7 && restRequest.hasParam("type")) {
             restRequest.param("type");
         }
@@ -132,7 +144,7 @@ public class RestMultiSearchAction extends BaseRestHandler {
                 );
             }
             multiRequest.add(searchRequest);
-        });
+        }, extraParamParser);
         List<SearchRequest> requests = multiRequest.requests();
         for (SearchRequest request : requests) {
             // preserve if it's set on the request
@@ -150,7 +162,17 @@ public class RestMultiSearchAction extends BaseRestHandler {
      * Parses a multi-line {@link RestRequest} body, instantiating a {@link SearchRequest} for each line and applying the given consumer.
      */
     public static void parseMultiLineRequest(RestRequest request, IndicesOptions indicesOptions, boolean allowExplicitIndex,
-            CheckedBiConsumer<SearchRequest, XContentParser, IOException> consumer) throws IOException {
+                                             CheckedBiConsumer<SearchRequest, XContentParser, IOException> consumer) throws IOException {
+        parseMultiLineRequest(request, indicesOptions, allowExplicitIndex, consumer, (k, v, r) -> false);
+    }
+
+    /**
+     * Parses a multi-line {@link RestRequest} body, instantiating a {@link SearchRequest} for each line and applying the given consumer.
+     * This variation allows the caller to provider a param parser.
+     */
+    public static void parseMultiLineRequest(RestRequest request, IndicesOptions indicesOptions, boolean allowExplicitIndex,
+            CheckedBiConsumer<SearchRequest, XContentParser, IOException> consumer,
+            TriFunction<String, Object, SearchRequest, Boolean> extraParamParser) throws IOException {
 
         String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
         String searchType = request.param("search_type");
@@ -161,7 +183,8 @@ public class RestMultiSearchAction extends BaseRestHandler {
         final XContent xContent = sourceTuple.v1().xContent();
         final BytesReference data = sourceTuple.v2();
         MultiSearchRequest.readMultiLineFormat(data, xContent, consumer, indices, indicesOptions, routing,
-                searchType, ccsMinimizeRoundtrips, request.getXContentRegistry(), allowExplicitIndex, request.getRestApiVersion());
+                searchType, ccsMinimizeRoundtrips, request.getXContentRegistry(), allowExplicitIndex, request.getRestApiVersion(),
+            extraParamParser);
     }
 
     @Override

+ 17 - 2
server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java

@@ -19,7 +19,6 @@ import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.logging.DeprecationLogger;
-import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.core.Booleans;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.index.query.QueryBuilder;
@@ -37,6 +36,7 @@ import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.search.suggest.SuggestBuilder;
 import org.elasticsearch.search.suggest.term.TermSuggestionBuilder.SuggestMode;
+import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -44,6 +44,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.function.BiConsumer;
 import java.util.function.IntConsumer;
 
 import static org.elasticsearch.action.ValidateActions.addValidationError;
@@ -64,7 +65,7 @@ public class RestSearchAction extends BaseRestHandler {
      */
     public static final String TOTAL_HITS_AS_INT_PARAM = "rest_total_hits_as_int";
     public static final String TYPED_KEYS_PARAM = "typed_keys";
-    private static final Set<String> RESPONSE_PARAMS;
+    public static final Set<String> RESPONSE_PARAMS;
 
     static {
         final Set<String> responseParams = new HashSet<>(Arrays.asList(TYPED_KEYS_PARAM, TOTAL_HITS_AS_INT_PARAM));
@@ -129,6 +130,18 @@ public class RestSearchAction extends BaseRestHandler {
                                           XContentParser requestContentParser,
                                           NamedWriteableRegistry namedWriteableRegistry,
                                           IntConsumer setSize) throws IOException {
+        parseSearchRequest(searchRequest, request, requestContentParser, namedWriteableRegistry, setSize, (r, sr) -> {});
+    }
+
+
+    /**
+     * Parses the rest request on top of the SearchRequest, preserving values that are not overridden by the rest request. This variation
+     * allows the caller to specify if wait_for_checkpoints functionality is supported.
+     */
+    public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
+                                          XContentParser requestContentParser,
+                                          NamedWriteableRegistry namedWriteableRegistry,
+                                          IntConsumer setSize, BiConsumer<RestRequest, SearchRequest> extraParamParser) throws IOException {
         if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
             request.param("type");
             deprecationLogger.compatibleCritical("search_with_types", TYPES_DEPRECATION_MESSAGE);
@@ -181,6 +194,8 @@ public class RestSearchAction extends BaseRestHandler {
             searchRequest.setCcsMinimizeRoundtrips(
                 request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));
         }
+
+        extraParamParser.accept(request, searchRequest);
     }
 
     /**

+ 114 - 9
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -15,6 +15,7 @@ import org.apache.lucene.search.FieldDoc;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.TopDocs;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
@@ -31,18 +32,18 @@ import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.core.Releasable;
-import org.elasticsearch.core.Releasables;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
@@ -57,6 +58,7 @@ import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryRewriteContext;
 import org.elasticsearch.index.query.Rewriteable;
 import org.elasticsearch.index.query.SearchExecutionContext;
+import org.elasticsearch.index.shard.GlobalCheckpointListeners;
 import org.elasticsearch.index.shard.IndexEventListener;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.SearchOperationListener;
@@ -113,6 +115,7 @@ import org.elasticsearch.search.sort.SortBuilder;
 import org.elasticsearch.search.suggest.Suggest;
 import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
 import org.elasticsearch.tasks.TaskCancelledException;
+import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.Scheduler.Cancellable;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool.Names;
@@ -127,14 +130,18 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 import static org.elasticsearch.core.TimeValue.timeValueHours;
 import static org.elasticsearch.core.TimeValue.timeValueMillis;
 import static org.elasticsearch.core.TimeValue.timeValueMinutes;
+import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
 
 public class SearchService extends AbstractLifecycleComponent implements IndexEventListener {
     private static final Logger logger = LogManager.getLogger(SearchService.class);
@@ -369,7 +376,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         final IndexShard shard = getShard(request);
         rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, rewritten) -> {
             // fork the execution in the search thread pool
-            runAsync(getExecutor(shard), () -> executeDfsPhase(request, task), l);
+            ensureAfterSeqNoRefreshed(shard, request, () -> executeDfsPhase(request, task), l);
         }));
     }
 
@@ -420,11 +427,105 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
                     return;
                 }
             }
-            // fork the execution in the search thread pool
-            runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task), l);
+            ensureAfterSeqNoRefreshed(shard, orig, () -> executeQueryPhase(orig, task), l);
         }));
     }
 
+    private <T> void ensureAfterSeqNoRefreshed(IndexShard shard, ShardSearchRequest request, CheckedSupplier<T, Exception> executable,
+                                               ActionListener<T> listener) {
+        final ActionRunnable<T> runnable = new ActionRunnable<>(listener) {
+            final Executor executor = getExecutor(shard);
+            @Override
+            protected void doRun() {
+                final TimeValue timeout = request.getWaitForCheckpointsTimeout();
+                final long waitForCheckpoint = request.waitForCheckpoint();
+                if (waitForCheckpoint > UNASSIGNED_SEQ_NO) {
+                    if (shard.indexSettings().getRefreshInterval().getMillis() <= 0) {
+                        listener.onFailure(
+                            new IllegalArgumentException("Cannot use wait_for_checkpoints with [index.refresh_interval=-1]")
+                        );
+                        return;
+                    }
+
+                    final AtomicBoolean isDone = new AtomicBoolean(false);
+                    final AtomicReference<Scheduler.ScheduledCancellable> timeoutTask = new AtomicReference<>();
+                    final ActionListener<Void> readyListener = new ActionListener<>() {
+                        @Override
+                        public void onResponse(Void unused) {
+                            // We must check that the sequence number is smaller than or equal to the global checkpoint. If it is not,
+                            // it is possible that a stale shard could return uncommitted documents.
+                            if (shard.getLastKnownGlobalCheckpoint() < waitForCheckpoint) {
+                                TimeValue gclTimeout = NO_TIMEOUT.equals(timeout) == false ? null : timeout;
+                                shard.addGlobalCheckpointListener(
+                                    waitForCheckpoint,
+                                    new GlobalCheckpointListeners.GlobalCheckpointListener() {
+                                        @Override
+                                        public Executor executor() {
+                                            return threadPool.executor(Names.SAME);
+                                        }
+
+                                        @Override
+                                        public void accept(long g, Exception e) {
+                                            if (g != UNASSIGNED_SEQ_NO) {
+                                                assert waitForCheckpoint <= g :
+                                                    shard.shardId() + " only advanced to [" + g + "] while waiting for ["
+                                                        + waitForCheckpoint + "]";
+                                                searchReady();
+                                            } else {
+                                                assert e != null;
+                                                // Ignore TimeoutException, our scheduled timeout task will handle this
+                                                if (e instanceof TimeoutException == false) {
+                                                    onFailure(e);
+                                                }
+                                            }
+                                        }
+                                    },
+                                    gclTimeout);
+                            } else {
+                                searchReady();
+                            }
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            if (isDone.compareAndSet(false, true)) {
+                                Scheduler.ScheduledCancellable localTimeoutTask = timeoutTask.get();
+                                if (localTimeoutTask != null) {
+                                    localTimeoutTask.cancel();
+                                }
+                                listener.onFailure(e);
+                            }
+                        }
+
+                        private void searchReady() {
+                            if (isDone.compareAndSet(false, true)) {
+                                Scheduler.ScheduledCancellable localTimeoutTask = timeoutTask.get();
+                                if (localTimeoutTask != null) {
+                                    localTimeoutTask.cancel();
+                                }
+                                runAsync(executor, executable, listener);
+                            }
+                        }
+                    };
+                    if (NO_TIMEOUT.equals(timeout) == false && isDone.get() == false) {
+                        Scheduler.ScheduledCancellable scheduled = threadPool.schedule(() ->
+                            readyListener.onFailure(
+                                new ElasticsearchTimeoutException(
+                                    "Wait for seq_no [{}] refreshed timed out [{}]",
+                                    waitForCheckpoint,
+                                    timeout)
+                            ), timeout, Names.SAME);
+                        timeoutTask.set(scheduled);
+                    }
+                    shard.addRefreshListener(waitForCheckpoint, readyListener);
+                } else {
+                    runAsync(executor, executable, listener);
+                }
+            }
+        };
+        runnable.run();
+    }
+
     private IndexShard getShard(ShardSearchRequest request) {
         final ShardSearchContextId contextId = request.readerId();
         if (contextId != null) {
@@ -1266,7 +1367,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
             } else {
                 indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
                 IndexShard indexShard = indexService.getShard(request.shardId().getId());
-                hasRefreshPending = indexShard.hasRefreshPending() && checkRefreshPending;
+                boolean needsWaitForRefresh = request.waitForCheckpoint() != UNASSIGNED_SEQ_NO;
+                // If this request wait_for_refresh behavior, it is safest to assume a refresh is pending. Theoretically,
+                // this can be improved in the future by manually checking that the requested checkpoint has already been refresh.
+                // However, this will request modifying the engine to surface that information.
+                hasRefreshPending = needsWaitForRefresh || (indexShard.hasRefreshPending() && checkRefreshPending);
                 canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
             }
             try (canMatchSearcher) {

+ 68 - 3
server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java

@@ -36,10 +36,12 @@ import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryRewriteContext;
 import org.elasticsearch.index.query.SearchExecutionContext;
 import org.elasticsearch.index.query.Rewriteable;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.AliasFilterParsingException;
 import org.elasticsearch.indices.InvalidAliasNameException;
 import org.elasticsearch.search.Scroll;
+import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.SearchSortValuesAndFormats;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.query.QuerySearchResult;
@@ -66,6 +68,8 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
     private final ShardId shardId;
     private final int shardRequestIndex;
     private final int numberOfShards;
+    private final long waitForCheckpoint;
+    private final TimeValue waitForCheckpointsTimeout;
     private final SearchType searchType;
     private final Scroll scroll;
     private final float indexBoost;
@@ -109,6 +113,34 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
                               @Nullable String clusterAlias,
                               ShardSearchContextId readerId,
                               TimeValue keepAlive) {
+        this(originalIndices,
+            searchRequest,
+            shardId,
+            shardRequestIndex,
+            numberOfShards,
+            aliasFilter,
+            indexBoost,
+            nowInMillis,
+            clusterAlias,
+            readerId,
+            keepAlive,
+            SequenceNumbers.UNASSIGNED_SEQ_NO,
+            SearchService.NO_TIMEOUT);
+    }
+
+    public ShardSearchRequest(OriginalIndices originalIndices,
+                              SearchRequest searchRequest,
+                              ShardId shardId,
+                              int shardRequestIndex,
+                              int numberOfShards,
+                              AliasFilter aliasFilter,
+                              float indexBoost,
+                              long nowInMillis,
+                              @Nullable String clusterAlias,
+                              ShardSearchContextId readerId,
+                              TimeValue keepAlive,
+                              long waitForCheckpoint,
+                              TimeValue waitForCheckpointsTimeout) {
         this(originalIndices,
             shardId,
             shardRequestIndex,
@@ -123,7 +155,9 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
             nowInMillis,
             clusterAlias,
             readerId,
-            keepAlive);
+            keepAlive,
+            waitForCheckpoint,
+            waitForCheckpointsTimeout);
         // If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
         // at this stage. Any NPEs in the above are therefore an error in request preparation logic.
         assert searchRequest.allowPartialSearchResults() != null;
@@ -133,7 +167,7 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
                               long nowInMillis,
                               AliasFilter aliasFilter) {
         this(OriginalIndices.NONE, shardId, -1, -1, SearchType.QUERY_THEN_FETCH, null, null,
-            aliasFilter, 1.0f, true, null, nowInMillis, null, null, null);
+            aliasFilter, 1.0f, true, null, nowInMillis, null, null, null, SequenceNumbers.UNASSIGNED_SEQ_NO, SearchService.NO_TIMEOUT);
     }
 
     private ShardSearchRequest(OriginalIndices originalIndices,
@@ -150,7 +184,9 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
                                long nowInMillis,
                                @Nullable String clusterAlias,
                                ShardSearchContextId readerId,
-                               TimeValue keepAlive) {
+                               TimeValue keepAlive,
+                               long waitForCheckpoint,
+                               TimeValue waitForCheckpointsTimeout) {
         this.shardId = shardId;
         this.shardRequestIndex = shardRequestIndex;
         this.numberOfShards = numberOfShards;
@@ -168,6 +204,8 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
         this.keepAlive = keepAlive;
         assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
         this.channelVersion = Version.CURRENT;
+        this.waitForCheckpoint = waitForCheckpoint;
+        this.waitForCheckpointsTimeout = waitForCheckpointsTimeout;
     }
 
     public ShardSearchRequest(StreamInput in) throws IOException {
@@ -209,6 +247,14 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
         }
         assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
         channelVersion = Version.min(Version.readVersion(in), in.getVersion());
+        // TODO: Update after backport
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            waitForCheckpoint = in.readLong();
+            waitForCheckpointsTimeout = in.readTimeValue();
+        } else {
+            waitForCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
+            waitForCheckpointsTimeout = SearchService.NO_TIMEOUT;
+        }
         originalIndices = OriginalIndices.readOriginalIndices(in);
     }
 
@@ -231,6 +277,8 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
         this.readerId = clone.readerId;
         this.keepAlive = clone.keepAlive;
         this.channelVersion = clone.channelVersion;
+        this.waitForCheckpoint = clone.waitForCheckpoint;
+        this.waitForCheckpointsTimeout = clone.waitForCheckpointsTimeout;
     }
 
     @Override
@@ -274,6 +322,15 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
             out.writeOptionalTimeValue(keepAlive);
         }
         Version.writeVersion(channelVersion, out);
+        // TODO: Update after backport
+        Version waitForCheckpointsVersion = Version.V_8_0_0;
+        if (out.getVersion().onOrAfter(waitForCheckpointsVersion)) {
+            out.writeLong(waitForCheckpoint);
+            out.writeTimeValue(waitForCheckpointsTimeout);
+        } else if (waitForCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO) {
+            throw new IllegalArgumentException("Remote node version [" + out.getVersion() + " incompatible with " +
+                "wait_for_checkpoints. All nodes must be version [" + waitForCheckpointsVersion + "] or greater.");
+        }
     }
 
     @Override
@@ -396,6 +453,14 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
         return keepAlive;
     }
 
+    public long waitForCheckpoint() {
+        return waitForCheckpoint;
+    }
+
+    public TimeValue getWaitForCheckpointsTimeout() {
+        return waitForCheckpointsTimeout;
+    }
+
     /**
      * Returns the cache key for this shard search request, based on its content
      */

+ 24 - 3
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -2692,7 +2692,10 @@ public class IndexShardTests extends IndexShardTestCase {
                 assertFalse(b);
                 called.set(true);
             });
-            assertTrue(called.get());
+
+            PlainActionFuture<Void> listener = PlainActionFuture.newFuture();
+            shard.addRefreshListener(10, listener);
+            expectThrows(IllegalIndexShardStateException.class, listener::actionGet);
         };
         IndexShard replica = newShard(primary.shardId(), false, "n2", metadata, null);
         DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
@@ -3443,7 +3446,21 @@ public class IndexShardTests extends IndexShardTestCase {
         assertTrue(primary.scheduledRefresh());
         Engine.IndexResult doc = indexDoc(primary, "_doc", "1", "{\"foo\" : \"bar\"}");
         CountDownLatch latch = new CountDownLatch(1);
-        primary.addRefreshListener(doc.getTranslogLocation(), r -> latch.countDown());
+        if (randomBoolean()) {
+            primary.addRefreshListener(doc.getTranslogLocation(), r -> latch.countDown());
+        } else {
+            primary.addRefreshListener(doc.getSeqNo(), new ActionListener<Void>() {
+                @Override
+                public void onResponse(Void unused) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    throw new AssertionError(e);
+                }
+            });
+        }
         assertEquals(1, latch.getCount());
         assertTrue(primary.getEngine().refreshNeeded());
         assertTrue(primary.scheduledRefresh());
@@ -3455,7 +3472,11 @@ public class IndexShardTests extends IndexShardTestCase {
 
         doc = indexDoc(primary, "_doc", "2", "{\"foo\" : \"bar\"}");
         CountDownLatch latch1 = new CountDownLatch(1);
-        primary.addRefreshListener(doc.getTranslogLocation(), r -> latch1.countDown());
+        if (randomBoolean()) {
+            primary.addRefreshListener(doc.getTranslogLocation(), r -> latch1.countDown());
+        } else {
+            primary.addRefreshListener(doc.getSeqNo(), ActionListener.wrap(latch1::countDown));
+        }
         assertEquals(1, latch1.getCount());
         assertTrue(primary.getEngine().refreshNeeded());
         assertTrue(primary.scheduledRefresh());

+ 166 - 43
server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

@@ -14,7 +14,9 @@ import org.apache.lucene.document.TextField;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -68,10 +70,13 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 
 import static org.elasticsearch.core.TimeValue.timeValueMillis;
 import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 
 /**
  * Tests how {@linkplain RefreshListeners} interacts with {@linkplain InternalEngine}.
@@ -92,11 +97,11 @@ public class RefreshListenersTests extends ESTestCase {
         threadPool = new TestThreadPool(getTestName());
         refreshMetric = new MeanMetric();
         listeners = new RefreshListeners(
-                () -> maxListeners,
-                () -> engine.refresh("too-many-listeners"),
-                logger,
-                threadPool.getThreadContext(),
-                refreshMetric);
+            () -> maxListeners,
+            () -> engine.refresh("too-many-listeners"),
+            logger,
+            threadPool.getThreadContext(),
+            refreshMetric);
 
         IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);
         ShardId shardId = new ShardId(new Index("index", "_na_"), 1);
@@ -144,6 +149,8 @@ public class RefreshListenersTests extends ESTestCase {
         engine = new InternalEngine(config);
         engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);
         listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);
+        listeners.setCurrentProcessedCheckpointSupplier(engine::getProcessedLocalCheckpoint);
+        listeners.setMaxIssuedSeqNoSupplier(engine::getMaxSeqNo);
     }
 
     @After
@@ -155,12 +162,17 @@ public class RefreshListenersTests extends ESTestCase {
     public void testBeforeRefresh() throws Exception {
         assertEquals(0, listeners.pendingCount());
         Engine.IndexResult index = index("1");
-        DummyRefreshListener listener = new DummyRefreshListener();
+        TestLocationListener listener = new TestLocationListener();
         assertFalse(listeners.addOrNotify(index.getTranslogLocation(), listener));
         assertNull(listener.forcedRefresh.get());
         assertEquals(1, listeners.pendingCount());
+
+        TestSeqNoListener seqNoListener = new TestSeqNoListener();
+        assertFalse(listeners.addOrNotify(index.getSeqNo(), seqNoListener));
+        assertEquals(2, listeners.pendingCount());
         engine.refresh("I said so");
         assertFalse(listener.forcedRefresh.get());
+        assertTrue(seqNoListener.isDone.get());
         listener.assertNoError();
         assertEquals(0, listeners.pendingCount());
     }
@@ -175,9 +187,12 @@ public class RefreshListenersTests extends ESTestCase {
                 engine.refresh("I said so");
             }
         }
-        DummyRefreshListener listener = new DummyRefreshListener();
+        TestLocationListener listener = new TestLocationListener();
         assertTrue(listeners.addOrNotify(index.getTranslogLocation(), listener));
         assertFalse(listener.forcedRefresh.get());
+        TestSeqNoListener seqNoListener = new TestSeqNoListener();
+        assertTrue(listeners.addOrNotify(index.getSeqNo(), seqNoListener));
+        assertTrue(seqNoListener.isDone.get());
         listener.assertNoError();
         assertEquals(0, listeners.pendingCount());
     }
@@ -185,16 +200,28 @@ public class RefreshListenersTests extends ESTestCase {
     public void testContextIsPreserved() throws IOException, InterruptedException {
         assertEquals(0, listeners.pendingCount());
         Engine.IndexResult index = index("1");
-        CountDownLatch latch = new CountDownLatch(1);
+        CountDownLatch latch = new CountDownLatch(2);
         try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
             threadPool.getThreadContext().putHeader("test", "foobar");
             assertFalse(listeners.addOrNotify(index.getTranslogLocation(), forced -> {
                 assertEquals("foobar", threadPool.getThreadContext().getHeader("test"));
                 latch.countDown();
             }));
+            assertFalse(listeners.addOrNotify(index.getSeqNo(), new ActionListener<>() {
+                @Override
+                public void onResponse(Void unused) {
+                    assertEquals("foobar", threadPool.getThreadContext().getHeader("test"));
+                    latch.countDown();
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    throw new AssertionError(e);
+                }
+            }));
         }
         assertNull(threadPool.getThreadContext().getHeader("test"));
-        assertEquals(1, latch.getCount());
+        assertEquals(2, latch.getCount());
         engine.refresh("I said so");
         latch.await();
     }
@@ -205,31 +232,50 @@ public class RefreshListenersTests extends ESTestCase {
         Engine.IndexResult index = index("1");
 
         // Fill the listener slots
-        List<DummyRefreshListener> nonForcedListeners = new ArrayList<>(maxListeners);
+        List<TestLocationListener> nonForcedLocationListeners = new ArrayList<>(maxListeners);
+        List<TestSeqNoListener> nonSeqNoLocationListeners = new ArrayList<>(maxListeners);
         for (int i = 0; i < maxListeners; i++) {
-            DummyRefreshListener listener = new DummyRefreshListener();
-            nonForcedListeners.add(listener);
-            listeners.addOrNotify(index.getTranslogLocation(), listener);
+            if (randomBoolean()) {
+                TestLocationListener listener = new TestLocationListener();
+                nonForcedLocationListeners.add(listener);
+                listeners.addOrNotify(index.getTranslogLocation(), listener);
+            } else {
+                TestSeqNoListener listener = new TestSeqNoListener();
+                nonSeqNoLocationListeners.add(listener);
+                listeners.addOrNotify(index.getSeqNo(), listener);
+            }
             assertTrue(listeners.refreshNeeded());
         }
 
         // We shouldn't have called any of them
-        for (DummyRefreshListener listener : nonForcedListeners) {
+        for (TestLocationListener listener : nonForcedLocationListeners) {
             assertNull("Called listener too early!", listener.forcedRefresh.get());
         }
+        for (TestSeqNoListener listener : nonSeqNoLocationListeners) {
+            assertFalse("Called listener too early!", listener.isDone.get());
+        }
         assertEquals(maxListeners, listeners.pendingCount());
 
+        // Checkpoint version produces error if too many listeners registered
+        TestSeqNoListener rejectedListener = new TestSeqNoListener();
+        listeners.addOrNotify(index.getSeqNo(), rejectedListener);
+        Exception error = rejectedListener.error;
+        assertThat(error, instanceOf(IllegalStateException.class));
+
         // Add one more listener which should cause a refresh.
-        DummyRefreshListener forcingListener = new DummyRefreshListener();
+        TestLocationListener forcingListener = new TestLocationListener();
         listeners.addOrNotify(index.getTranslogLocation(), forcingListener);
         assertTrue("Forced listener wasn't forced?", forcingListener.forcedRefresh.get());
         forcingListener.assertNoError();
 
         // That forces all the listeners through. It would be on the listener ThreadPool but we've made all of those execute immediately.
-        for (DummyRefreshListener listener : nonForcedListeners) {
+        for (TestLocationListener listener : nonForcedLocationListeners) {
             assertEquals("Expected listener called with unforced refresh!", Boolean.FALSE, listener.forcedRefresh.get());
             listener.assertNoError();
         }
+        for (TestSeqNoListener listener : nonSeqNoLocationListeners) {
+            assertEquals("Expected listener called to be called by refresh!", Boolean.TRUE, listener.isDone.get());
+        }
         assertFalse(listeners.refreshNeeded());
         assertEquals(0, listeners.pendingCount());
     }
@@ -242,31 +288,48 @@ public class RefreshListenersTests extends ESTestCase {
         {
             /* Closing flushed pending listeners as though they were refreshed. Since this can only happen when the index is closed and no
              * longer useful there doesn't seem much point in sending the listener some kind of "I'm closed now, go away" enum value. */
-            DummyRefreshListener listener = new DummyRefreshListener();
+            TestLocationListener listener = new TestLocationListener();
             assertFalse(listeners.addOrNotify(unrefreshedOperation.getTranslogLocation(), listener));
             assertNull(listener.forcedRefresh.get());
+
+            TestSeqNoListener seqNoListener = new TestSeqNoListener();
+            assertFalse(listeners.addOrNotify(unrefreshedOperation.getSeqNo(), seqNoListener));
+            assertFalse(seqNoListener.isDone.get());
+
             listeners.close();
             assertFalse(listener.forcedRefresh.get());
             listener.assertNoError();
+            assertNotNull(seqNoListener.error);
+            assertThat(seqNoListener.error, instanceOf(AlreadyClosedException.class));
             assertFalse(listeners.refreshNeeded());
             assertEquals(0, listeners.pendingCount());
         }
         {
             // If you add a listener for an already refreshed location then it'll just fire even if closed
-            DummyRefreshListener listener = new DummyRefreshListener();
+            TestLocationListener listener = new TestLocationListener();
             assertTrue(listeners.addOrNotify(refreshedOperation.getTranslogLocation(), listener));
             assertFalse(listener.forcedRefresh.get());
             listener.assertNoError();
+            TestSeqNoListener seqNoListener = new TestSeqNoListener();
+            assertTrue(listeners.addOrNotify(refreshedOperation.getSeqNo(), seqNoListener));
+            assertTrue(seqNoListener.isDone.get());
+
             assertFalse(listeners.refreshNeeded());
             assertEquals(0, listeners.pendingCount());
         }
         {
-            // But adding a listener to a non-refreshed location will fail
-            DummyRefreshListener listener = new DummyRefreshListener();
+            // But adding a location listener to a non-refreshed location will fail
+            TestLocationListener listener = new TestLocationListener();
             Exception e = expectThrows(IllegalStateException.class, () ->
                 listeners.addOrNotify(unrefreshedOperation.getTranslogLocation(), listener));
             assertEquals("can't wait for refresh on a closed index", e.getMessage());
             assertNull(listener.forcedRefresh.get());
+
+            // But adding a seqNo listener to a non-refreshed location will fail listener
+            TestSeqNoListener seqNoListener = new TestSeqNoListener();
+            listeners.addOrNotify(unrefreshedOperation.getSeqNo(), seqNoListener);
+            assertEquals("can't wait for refresh on a closed index", seqNoListener.error.getMessage());
+
             assertFalse(listeners.refreshNeeded());
             assertEquals(0, listeners.pendingCount());
         }
@@ -288,15 +351,26 @@ public class RefreshListenersTests extends ESTestCase {
         try {
             for (int i = 0; i < 1000; i++) {
                 Engine.IndexResult index = index("1");
-                DummyRefreshListener listener = new DummyRefreshListener();
-                boolean immediate = listeners.addOrNotify(index.getTranslogLocation(), listener);
-                if (immediate) {
-                    assertNotNull(listener.forcedRefresh.get());
+
+                boolean immediate;
+                BooleanSupplier doneSupplier;
+                if (randomBoolean()) {
+                    TestLocationListener listener = new TestLocationListener();
+                    immediate = listeners.addOrNotify(index.getTranslogLocation(), listener);
+                    doneSupplier = () -> listener.forcedRefresh.get() != null;
+                    if (immediate) {
+                        assertNotNull(listener.forcedRefresh.get());
+                        assertTrue(listener.forcedRefresh.get());
+                        listener.assertNoError();
+                    }
                 } else {
-                    assertBusy(() -> assertNotNull(listener.forcedRefresh.get()), 1, TimeUnit.MINUTES);
+                    TestSeqNoListener seqNoListener = new TestSeqNoListener();
+                    immediate = listeners.addOrNotify(index.getSeqNo(), seqNoListener);
+                    doneSupplier = seqNoListener.isDone::get;
+                }
+                if (immediate == false) {
+                    assertBusy(doneSupplier::getAsBoolean, 1, TimeUnit.MINUTES);
                 }
-                assertFalse(listener.forcedRefresh.get());
-                listener.assertNoError();
             }
         } finally {
             run.set(false);
@@ -326,10 +400,16 @@ public class RefreshListenersTests extends ESTestCase {
                         Engine.IndexResult index = index(threadId, testFieldValue);
                         assertEquals(iteration, index.getVersion());
 
-                        DummyRefreshListener listener = new DummyRefreshListener();
+                        TestLocationListener listener = new TestLocationListener();
                         listeners.addOrNotify(index.getTranslogLocation(), listener);
-                        assertBusy(() -> assertNotNull("listener never called", listener.forcedRefresh.get()), 1, TimeUnit.MINUTES);
-                        if (threadCount < maxListeners) {
+                        TestSeqNoListener seqNoListener = new TestSeqNoListener();
+                        long processedLocalCheckpoint = engine.getProcessedLocalCheckpoint();
+                        listeners.addOrNotify(processedLocalCheckpoint, seqNoListener);
+                        assertBusy(() -> {
+                            assertNotNull("location listener never called", listener.forcedRefresh.get());
+                            assertTrue("seqNo listener never called", seqNoListener.isDone.get() || seqNoListener.error != null);
+                        }, 1, TimeUnit.MINUTES);
+                        if ((threadCount * 2) < maxListeners) {
                             assertFalse(listener.forcedRefresh.get());
                         }
                         listener.assertNoError();
@@ -341,7 +421,7 @@ public class RefreshListenersTests extends ESTestCase {
                             assertTrue("document not found", getResult.exists());
                             assertEquals(iteration, getResult.version());
                             org.apache.lucene.document.Document document =
-                                    getResult.docIdAndVersion().reader.document(getResult.docIdAndVersion().docId);
+                                getResult.docIdAndVersion().reader.document(getResult.docIdAndVersion().docId);
                             assertThat(document.getValues("test"), arrayContaining(testFieldValue));
                         }
                     } catch (Exception t) {
@@ -352,7 +432,7 @@ public class RefreshListenersTests extends ESTestCase {
             indexers[thread].start();
         }
 
-        for (Thread indexer: indexers) {
+        for (Thread indexer : indexers) {
             indexer.join();
         }
         refresher.cancel();
@@ -360,36 +440,63 @@ public class RefreshListenersTests extends ESTestCase {
 
     public void testDisallowAddListeners() throws Exception {
         assertEquals(0, listeners.pendingCount());
-        DummyRefreshListener listener = new DummyRefreshListener();
+        TestLocationListener listener = new TestLocationListener();
         assertFalse(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
+        TestSeqNoListener seqNoListener = new TestSeqNoListener();
+        assertFalse(listeners.addOrNotify(index("1").getSeqNo(), seqNoListener));
         engine.refresh("I said so");
         assertFalse(listener.forcedRefresh.get());
         listener.assertNoError();
+        assertTrue(seqNoListener.isDone.get());
 
         try (Releasable releaseable1 = listeners.forceRefreshes()) {
-            listener = new DummyRefreshListener();
+            listener = new TestLocationListener();
             assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
             assertTrue(listener.forcedRefresh.get());
             listener.assertNoError();
-            assertEquals(0, listeners.pendingCount());
+            seqNoListener = new TestSeqNoListener();
+            // SeqNo listeners are not forced
+            assertFalse(listeners.addOrNotify(index("1").getSeqNo(), seqNoListener));
+            assertFalse(seqNoListener.isDone.get());
+            assertEquals(1, listeners.pendingCount());
 
             try (Releasable releaseable2 = listeners.forceRefreshes()) {
-                listener = new DummyRefreshListener();
+                listener = new TestLocationListener();
                 assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
                 assertTrue(listener.forcedRefresh.get());
                 listener.assertNoError();
-                assertEquals(0, listeners.pendingCount());
+                seqNoListener = new TestSeqNoListener();
+                // SeqNo listeners are not forced
+                assertFalse(listeners.addOrNotify(index("1").getSeqNo(), seqNoListener));
+                assertFalse(seqNoListener.isDone.get());
+                assertEquals(1, listeners.pendingCount());
             }
 
-            listener = new DummyRefreshListener();
+            listener = new TestLocationListener();
             assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
             assertTrue(listener.forcedRefresh.get());
             listener.assertNoError();
-            assertEquals(0, listeners.pendingCount());
+            seqNoListener = new TestSeqNoListener();
+            // SeqNo listeners are not forced
+            assertFalse(listeners.addOrNotify(index("1").getSeqNo(), seqNoListener));
+            assertFalse(seqNoListener.isDone.get());
+            assertEquals(1, listeners.pendingCount());
         }
 
-        assertFalse(listeners.addOrNotify(index("1").getTranslogLocation(), new DummyRefreshListener()));
-        assertEquals(1, listeners.pendingCount());
+        assertFalse(listeners.addOrNotify(index("1").getTranslogLocation(), new TestLocationListener()));
+        assertFalse(listeners.addOrNotify(index("1").getSeqNo(), new TestSeqNoListener()));
+        assertEquals(3, listeners.pendingCount());
+    }
+
+    public void testSequenceNumberMustBeIssued() throws Exception {
+        assertEquals(0, listeners.pendingCount());
+        TestSeqNoListener seqNoListener = new TestSeqNoListener();
+        long issued = index("1").getSeqNo();
+        assertTrue(listeners.addOrNotify(issued + 1, seqNoListener));
+        assertThat(seqNoListener.error, instanceOf(IllegalArgumentException.class));
+        String message = "Cannot wait for unissued seqNo checkpoint [wait_for_checkpoint="
+            + (issued + 1) + ", max_issued_seqNo=" + issued + "]";
+        assertThat(seqNoListener.error.getMessage(), equalTo(message));
     }
 
     private Engine.IndexResult index(String id) throws IOException {
@@ -408,13 +515,13 @@ public class RefreshListenersTests extends ESTestCase {
         document.add(seqID.seqNo);
         document.add(seqID.seqNoDocValue);
         document.add(seqID.primaryTerm);
-        BytesReference source = new BytesArray(new byte[] { 1 });
+        BytesReference source = new BytesArray(new byte[]{1});
         ParsedDocument doc = new ParsedDocument(versionField, seqID, id, null, Arrays.asList(document), source, XContentType.JSON, null);
         Engine.Index index = new Engine.Index(uid, engine.config().getPrimaryTermSupplier().getAsLong(), doc);
         return engine.index(index);
     }
 
-    private static class DummyRefreshListener implements Consumer<Boolean> {
+    private static class TestLocationListener implements Consumer<Boolean> {
         /**
          * When the listener is called this captures it's only argument.
          */
@@ -437,4 +544,20 @@ public class RefreshListenersTests extends ESTestCase {
             }
         }
     }
+
+    private static class TestSeqNoListener implements ActionListener<Void> {
+
+        final AtomicReference<Boolean> isDone = new AtomicReference<>(false);
+        private volatile Exception error;
+
+        @Override
+        public void onResponse(Void unused) {
+            isDone.set(true);
+        }
+
+        @Override
+        public void onFailure(Exception e) {
+            error = e;
+        }
+    }
 }

+ 88 - 0
server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

@@ -16,6 +16,7 @@ import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.index.IndexResponse;
@@ -118,6 +119,7 @@ import static org.elasticsearch.indices.cluster.IndicesClusterStateService.Alloc
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -1342,6 +1344,92 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
         client().clearScroll(clearScrollRequest);
     }
 
+    public void testWaitOnRefresh() throws Exception {
+        createIndex("index");
+        final SearchService service = getInstanceFromNode(SearchService.class);
+        final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
+        final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
+        final IndexShard indexShard = indexService.getShard(0);
+        SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
+
+        final IndexResponse response = client().prepareIndex("index").setSource("id", "1").get();
+        assertEquals(RestStatus.CREATED, response.status());
+
+        SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap());
+        PlainActionFuture<SearchPhaseResult> future = PlainActionFuture.newFuture();
+        ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1,
+            new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null, null, 0, TimeValue.timeValueSeconds(30));
+        service.executeQueryPhase(request, task, future);
+        SearchPhaseResult searchPhaseResult = future.actionGet();
+        assertEquals(1, searchPhaseResult.queryResult().getTotalHits().value);
+    }
+
+    public void testWaitOnRefreshFailsWithRefreshesDisabled() throws Exception {
+        createIndex("index", Settings.builder().put("index.refresh_interval", "-1").build());
+        final SearchService service = getInstanceFromNode(SearchService.class);
+        final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
+        final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
+        final IndexShard indexShard = indexService.getShard(0);
+        SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
+
+        final IndexResponse response = client().prepareIndex("index").setSource("id", "1").get();
+        assertEquals(RestStatus.CREATED, response.status());
+
+        SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap());
+        PlainActionFuture<SearchPhaseResult> future = PlainActionFuture.newFuture();
+        ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1,
+            new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null, null, 0, TimeValue.timeValueSeconds(30));
+        service.executeQueryPhase(request, task, future);
+        IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, future::actionGet);
+        assertThat(illegalArgumentException.getMessage(),
+            containsString("Cannot use wait_for_checkpoints with [index.refresh_interval=-1]"));
+    }
+
+    public void testWaitOnRefreshFailsIfCheckpointNotIndexed() throws Exception {
+        createIndex("index");
+        final SearchService service = getInstanceFromNode(SearchService.class);
+        final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
+        final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
+        final IndexShard indexShard = indexService.getShard(0);
+        SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
+
+        final IndexResponse response = client().prepareIndex("index").setSource("id", "1").get();
+        assertEquals(RestStatus.CREATED, response.status());
+
+        SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap());
+        PlainActionFuture<SearchPhaseResult> future = PlainActionFuture.newFuture();
+        ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1,
+            new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null, null, 1,
+            TimeValue.timeValueMillis(randomIntBetween(10, 100)));
+        service.executeQueryPhase(request, task, future);
+
+        IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, future::actionGet);
+        assertThat(ex.getMessage(),
+            containsString("Cannot wait for unissued seqNo checkpoint [wait_for_checkpoint=1, max_issued_seqNo=0]"));
+    }
+
+    public void testWaitOnRefreshTimeout() throws Exception {
+        createIndex("index", Settings.builder().put("index.refresh_interval", "60s").build());
+        final SearchService service = getInstanceFromNode(SearchService.class);
+        final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
+        final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
+        final IndexShard indexShard = indexService.getShard(0);
+        SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
+
+        final IndexResponse response = client().prepareIndex("index").setSource("id", "1").get();
+        assertEquals(RestStatus.CREATED, response.status());
+
+        SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap());
+        PlainActionFuture<SearchPhaseResult> future = PlainActionFuture.newFuture();
+        ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1,
+            new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null, null, 0,
+            TimeValue.timeValueMillis(randomIntBetween(10, 100)));
+        service.executeQueryPhase(request, task, future);
+
+        ElasticsearchTimeoutException ex = expectThrows(ElasticsearchTimeoutException.class, future::actionGet);
+        assertThat(ex.getMessage(), containsString("Wait for seq_no [0] refreshed timed out ["));
+    }
+
     private ReaderContext createReaderContext(IndexService indexService, IndexShard indexShard) {
         return new ReaderContext(new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong()),
             indexService, indexShard, indexShard.acquireSearcherSupplier(), randomNonNegativeLong(), false);

+ 0 - 0
x-pack/plugin/fleet/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/fleet/global_checkpoints.yml → x-pack/plugin/fleet/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/fleet/10_global_checkpoints.yml


+ 110 - 0
x-pack/plugin/fleet/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/fleet/20_wait_for_checkpoints.yml

@@ -0,0 +1,110 @@
+setup:
+  - do:
+      indices.create:
+        index: test-refresh-disabled
+        body:
+          settings:
+            number_of_shards: "1"
+            refresh_interval: "-1"
+
+  - do:
+      indices.create:
+        index: test-after-refresh
+        body:
+          settings:
+            number_of_shards: "1"
+
+  - do:
+      indices.put_alias:
+        index: test-after-refresh
+        name: test-alias
+
+  - do:
+      index:
+        index: test-after-refresh
+        body: { }
+
+  - do:
+      index:
+        index: test-after-refresh
+        body: { }
+
+  - do:
+      indices.refresh: {}
+
+---
+"Execute successful after refresh search":
+  - do:
+      fleet.search:
+        index: "test-after-refresh"
+        allow_partial_search_results: false
+        wait_for_checkpoints: 1
+        body: { query: { match_all: {} } }
+
+  - match: { _shards.successful: 1 }
+  - match: { hits.total.value: 2 }
+
+---
+"Cannot use alias":
+  - do:
+      catch: bad_request
+      fleet.search:
+        index: "test-alias"
+        allow_partial_search_results: false
+        wait_for_checkpoints: 1
+        body: { query: { match_all: {} } }
+
+---
+"Must provide correct number of checkpoints":
+  - do:
+      catch: bad_request
+      fleet.search:
+        index: "test-after-refresh"
+        allow_partial_search_results: false
+        wait_for_checkpoints: [1, 1]
+        body: { query: { match_all: {} } }
+
+---
+"Cannot wait_for_checkpoints with refresh disabled":
+  - do:
+      catch: bad_request
+      fleet.search:
+        index: "test-refresh-disabled"
+        wait_for_checkpoints: 0
+        body: { query: { match_all: { } } }
+
+---
+"Wait for unprocessed checkpoint fails":
+  - do:
+      catch: bad_request
+      fleet.search:
+        index: "test-after-refresh"
+        allow_partial_search_results: false
+        wait_for_checkpoints: 2
+        body: { query: { match_all: {} } }
+
+---
+"Test msearch":
+  - do:
+      fleet.msearch:
+        index: "test-after-refresh"
+        body:
+          - { "allow_partial_search_results": false, wait_for_checkpoints: 1 }
+          - { query: { match_all: { } } }
+          - { "allow_partial_search_results": false, wait_for_checkpoints: 2 }
+          - { query: { match_all: { } } }
+
+  - match: { responses.0._shards.successful: 1 }
+  - match: { responses.0.hits.total.value: 2 }
+  - match: { responses.1.error.caused_by.type: "illegal_argument_exception" }
+  - do:
+      fleet.msearch:
+        body:
+          - {"index": "test-after-refresh", "allow_partial_search_results" : false, wait_for_checkpoints: 1}
+          - {query: { match_all: {} } }
+          - {"index": "test-refresh-disabled", "allow_partial_search_results":  false, wait_for_checkpoints: 2}
+          - {query: { match_all: {} } }
+
+  - match: { responses.0._shards.successful: 1 }
+  - match: { responses.0.hits.total.value: 2 }
+  - match: { responses.1.error.caused_by.type: "illegal_argument_exception" }

+ 3 - 2
x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java

@@ -51,13 +51,14 @@ import org.elasticsearch.xpack.core.action.DeleteDataStreamAction.Request;
 import org.elasticsearch.xpack.core.template.TemplateUtils;
 import org.elasticsearch.xpack.fleet.action.GetGlobalCheckpointsAction;
 import org.elasticsearch.xpack.fleet.action.GetGlobalCheckpointsShardAction;
+import org.elasticsearch.xpack.fleet.rest.RestFleetMultiSearchAction;
+import org.elasticsearch.xpack.fleet.rest.RestFleetSearchAction;
 import org.elasticsearch.xpack.fleet.rest.RestGetGlobalCheckpointsAction;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -341,6 +342,6 @@ public class Fleet extends Plugin implements SystemIndexPlugin {
         IndexNameExpressionResolver indexNameExpressionResolver,
         Supplier<DiscoveryNodes> nodesInCluster
     ) {
-        return Collections.singletonList(new RestGetGlobalCheckpointsAction());
+        return Arrays.asList(new RestGetGlobalCheckpointsAction(), new RestFleetSearchAction(), new RestFleetMultiSearchAction(settings));
     }
 }

+ 112 - 0
x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/rest/RestFleetMultiSearchAction.java

@@ -0,0 +1,112 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.fleet.rest;
+
+import org.elasticsearch.action.search.MultiSearchAction;
+import org.elasticsearch.action.search.MultiSearchRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.rest.action.search.RestMultiSearchAction;
+import org.elasticsearch.rest.action.search.RestSearchAction;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringArrayValue;
+import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeTimeValue;
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+import static org.elasticsearch.rest.RestRequest.Method.POST;
+
+public class RestFleetMultiSearchAction extends BaseRestHandler {
+
+    private final boolean allowExplicitIndex;
+
+    public RestFleetMultiSearchAction(Settings settings) {
+        this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
+    }
+
+    @Override
+    public String getName() {
+        return "fleet_msearch_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(
+            new Route(GET, "/_fleet/_msearch"),
+            new Route(POST, "/_fleet/_msearch"),
+            new Route(GET, "/{index}/_fleet/_msearch"),
+            new Route(POST, "/{index}/_fleet/_msearch")
+        );
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
+        final MultiSearchRequest multiSearchRequest = RestMultiSearchAction.parseRequest(
+            request,
+            client.getNamedWriteableRegistry(),
+            allowExplicitIndex,
+            (key, value, searchRequest) -> {
+                if ("wait_for_checkpoints".equals(key)) {
+                    String[] stringWaitForCheckpoints = nodeStringArrayValue(value);
+                    final long[] waitForCheckpoints = new long[stringWaitForCheckpoints.length];
+                    for (int i = 0; i < stringWaitForCheckpoints.length; ++i) {
+                        waitForCheckpoints[i] = Long.parseLong(stringWaitForCheckpoints[i]);
+                    }
+                    searchRequest.setWaitForCheckpoints(Collections.singletonMap("*", waitForCheckpoints));
+                    return true;
+                } else if ("wait_for_checkpoints_timeout".equals(key)) {
+                    final TimeValue waitForCheckpointsTimeout = nodeTimeValue(value, TimeValue.timeValueSeconds(30));
+                    searchRequest.setWaitForCheckpointsTimeout(waitForCheckpointsTimeout);
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        );
+
+        for (SearchRequest searchRequest : multiSearchRequest.requests()) {
+            String[] indices = searchRequest.indices();
+            Map<String, long[]> waitForCheckpoints = searchRequest.getWaitForCheckpoints();
+            if (waitForCheckpoints.isEmpty() == false) {
+                if (indices.length == 0) {
+                    throw new IllegalArgumentException(
+                        "Fleet search API param wait_for_checkpoints is only supported with an index to search specified. "
+                            + "No index specified."
+                    );
+                } else if (indices.length > 1) {
+                    throw new IllegalArgumentException(
+                        "Fleet search API only supports searching a single index. Found: [" + Arrays.toString(indices) + "]."
+                    );
+                }
+            }
+            long[] checkpoints = searchRequest.getWaitForCheckpoints().get("*");
+            searchRequest.setWaitForCheckpoints(Collections.singletonMap(indices[0], checkpoints));
+        }
+
+        return channel -> {
+            final RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel());
+            cancellableClient.execute(MultiSearchAction.INSTANCE, multiSearchRequest, new RestToXContentListener<>(channel));
+        };
+    }
+
+    @Override
+    protected Set<String> responseParams() {
+        return RestSearchAction.RESPONSE_PARAMS;
+    }
+}

+ 99 - 0
x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/rest/RestFleetSearchAction.java

@@ -0,0 +1,99 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.fleet.rest;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
+import org.elasticsearch.rest.action.RestStatusToXContentListener;
+import org.elasticsearch.rest.action.search.RestSearchAction;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.IntConsumer;
+
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+import static org.elasticsearch.rest.RestRequest.Method.POST;
+
+public class RestFleetSearchAction extends BaseRestHandler {
+
+    @Override
+    public String getName() {
+        return "fleet_search_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(GET, "/{index}/_fleet/_search"), new Route(POST, "/{index}/_fleet/_search"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
+        SearchRequest searchRequest;
+        if (request.hasParam("min_compatible_shard_node")) {
+            searchRequest = new SearchRequest(Version.fromString(request.param("min_compatible_shard_node")));
+        } else {
+            searchRequest = new SearchRequest();
+        }
+        String[] indices = searchRequest.indices();
+        if (indices.length > 1) {
+            throw new IllegalArgumentException(
+                "Fleet search API only supports searching a single index. Found: [" + Arrays.toString(indices) + "]."
+            );
+        }
+
+        IntConsumer setSize = size -> searchRequest.source().size(size);
+        request.withContentOrSourceParamParserOrNull(
+            parser -> RestSearchAction.parseSearchRequest(
+                searchRequest,
+                request,
+                parser,
+                client.getNamedWriteableRegistry(),
+                setSize,
+                (restRequest, sr) -> {
+                    String[] stringWaitForCheckpoints = request.paramAsStringArray("wait_for_checkpoints", Strings.EMPTY_ARRAY);
+                    final long[] waitForCheckpoints = new long[stringWaitForCheckpoints.length];
+                    for (int i = 0; i < stringWaitForCheckpoints.length; ++i) {
+                        waitForCheckpoints[i] = Long.parseLong(stringWaitForCheckpoints[i]);
+                    }
+                    String[] indices1 = Strings.splitStringByCommaToArray(request.param("index"));
+                    if (indices1.length > 1) {
+                        throw new IllegalArgumentException(
+                            "Fleet search API only supports searching a single index. Found: [" + Arrays.toString(indices1) + "]."
+                        );
+                    }
+                    sr.setWaitForCheckpoints(Collections.singletonMap(indices1[0], waitForCheckpoints));
+                    final TimeValue waitForCheckpointsTimeout = request.paramAsTime(
+                        "wait_for_checkpoints_timeout",
+                        TimeValue.timeValueSeconds(30)
+                    );
+                    sr.setWaitForCheckpointsTimeout(waitForCheckpointsTimeout);
+                }
+            )
+        );
+
+        return channel -> {
+            RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
+            cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));
+        };
+    }
+
+    @Override
+    protected Set<String> responseParams() {
+        return RestSearchAction.RESPONSE_PARAMS;
+    }
+}