Browse Source

Retry point in time on other copy when possible (#66713)

Relates #61062
Nhat Nguyen 4 years ago
parent
commit
59082c0d3a
20 changed files with 445 additions and 81 deletions
  1. 2 2
      docs/reference/search/point-in-time-api.asciidoc
  2. 6 6
      docs/reference/search/search-your-data/paginate-search-results.asciidoc
  3. 27 16
      server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java
  4. 1 1
      server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java
  5. 1 1
      server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java
  6. 10 2
      server/src/main/java/org/elasticsearch/action/search/SearchContextId.java
  7. 7 0
      server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java
  8. 5 0
      server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
  9. 21 4
      server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
  10. 1 1
      server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java
  11. 65 28
      server/src/main/java/org/elasticsearch/search/SearchService.java
  12. 28 15
      server/src/main/java/org/elasticsearch/search/builder/PointInTimeBuilder.java
  13. 1 0
      server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java
  14. 22 3
      server/src/main/java/org/elasticsearch/search/internal/ShardSearchContextId.java
  15. 5 0
      server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java
  16. 63 0
      server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java
  17. 1 2
      server/src/test/java/org/elasticsearch/search/SearchServiceTests.java
  18. 66 0
      x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java
  19. 24 0
      x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java
  20. 89 0
      x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java

+ 2 - 2
docs/reference/search/point-in-time-api.asciidoc

@@ -38,7 +38,7 @@ POST /_search <1>
         }
     },
     "pit": {
-	    "id":  "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2>
+	    "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2>
 	    "keep_alive": "1m"  <3>
     }
 }
@@ -99,7 +99,7 @@ as soon as they are no longer used in search requests.
 ---------------------------------------
 DELETE /_pit
 {
-    "id" : "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWIBBXV1aWQyAAA="
+    "id" : "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
 }
 ---------------------------------------
 // TEST[catch:missing]

+ 6 - 6
docs/reference/search/search-your-data/paginate-search-results.asciidoc

@@ -62,10 +62,10 @@ The API returns a PIT ID.
 [source,console-result]
 ----
 {
-  "id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
+  "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
 }
 ----
-// TESTRESPONSE[s/"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/]
+// TESTRESPONSE[s/"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/]
 
 To get the first page of results, submit a search request with a `sort`
 argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit
@@ -86,7 +86,7 @@ GET /_search
     }
   },
   "pit": {
-	    "id":  "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
+	    "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
 	    "keep_alive": "1m"
   },
   "sort": [ <2>
@@ -106,7 +106,7 @@ a PIT, the response's `pit_id` parameter contains an updated PIT ID.
 [source,console-result]
 ----
 {
-  "pit_id" : "46ToAwEPbXktaW5kZXgtMDAwMDAxFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAFldicVdzOFFtVHZTZDFoWWowTGkwS0EAAAAAAAAAAAQURzZzcUszUUJ5U1NMX3Jyak5ET0wBFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAAA==", <1>
+  "pit_id" : "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
   "took" : 17,
   "timed_out" : false,
   "_shards" : ...,
@@ -150,7 +150,7 @@ GET /_search
     }
   },
   "pit": {
-	    "id":  "46ToAwEPbXktaW5kZXgtMDAwMDAxFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAFldicVdzOFFtVHZTZDFoWWowTGkwS0EAAAAAAAAAAAQURzZzcUszUUJ5U1NMX3Jyak5ET0wBFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAAA==", <1>
+	    "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
 	    "keep_alive": "1m"
   },
   "sort": [
@@ -178,7 +178,7 @@ When you're finished, you should delete your PIT.
 ----
 DELETE /_pit
 {
-    "id" : "46ToAwEPbXktaW5kZXgtMDAwMDAxFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAFldicVdzOFFtVHZTZDFoWWowTGkwS0EAAAAAAAAAAAQURzZzcUszUUJ5U1NMX3Jyak5ET0wBFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAAA=="
+    "id" : "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
 }
 ----
 // TEST[catch:missing]

+ 27 - 16
server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

@@ -38,11 +38,14 @@ import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.search.SearchContextMissingException;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchShardTarget;
+import org.elasticsearch.search.builder.PointInTimeBuilder;
 import org.elasticsearch.search.internal.AliasFilter;
 import org.elasticsearch.search.internal.InternalSearchResponse;
 import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.internal.ShardSearchContextId;
 import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.transport.Transport;
@@ -478,7 +481,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
             } else {
                 // the failure is already present, try and not override it with an exception that is less meaningless
                 // for example, getting illegal shard state
-                if (TransportActions.isReadOverrideException(e)) {
+                if (TransportActions.isReadOverrideException(e) && (e instanceof SearchContextMissingException == false)) {
                     shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
                 }
             }
@@ -567,6 +570,16 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
         return request;
     }
 
+    @Override
+    public boolean isPartOfPointInTime(ShardSearchContextId contextId) {
+        final PointInTimeBuilder pointInTimeBuilder = request.pointInTimeBuilder();
+        if (pointInTimeBuilder != null) {
+            return request.pointInTimeBuilder().getSearchContextId(searchTransportService.getNamedWriteableRegistry()).contains(contextId);
+        } else {
+            return false;
+        }
+    }
+
     protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, ShardSearchFailure[] failures,
                                                        String scrollId, String searchContextId) {
         int numSuccess = successfulOps.get();
@@ -598,7 +611,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
                 searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minNodeVersion);
             } else {
                 if (request.source() != null && request.source().pointInTimeBuilder() != null) {
-                    searchContextId = request.source().pointInTimeBuilder().getId();
+                    searchContextId = request.source().pointInTimeBuilder().getEncodedId();
                 } else {
                     searchContextId = null;
                 }
@@ -619,21 +632,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
      * @param exception the exception explaining or causing the phase failure
      */
     private void raisePhaseFailure(SearchPhaseExecutionException exception) {
-        // we don't release persistent readers (point in time).
-        if (request.pointInTimeBuilder() == null) {
-            results.getSuccessfulResults().forEach((entry) -> {
-                if (entry.getContextId() != null) {
-                    try {
-                        SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
-                        Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
-                        sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
-                    } catch (Exception inner) {
-                        inner.addSuppressed(exception);
-                        logger.trace("failed to release context", inner);
-                    }
+        results.getSuccessfulResults().forEach((entry) -> {
+            // Do not release search contexts that are part of the point in time
+            if (entry.getContextId() != null && isPartOfPointInTime(entry.getContextId()) == false) {
+                try {
+                    SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
+                    Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
+                    sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
+                } catch (Exception inner) {
+                    inner.addSuppressed(exception);
+                    logger.trace("failed to release context", inner);
                 }
-            });
-        }
+            }
+        });
         Releasables.close(releasables);
         listener.onFailure(exception);
     }

+ 1 - 1
server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

@@ -100,7 +100,7 @@ final class DfsQueryPhase extends SearchPhase {
                             progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception);
                             counter.onFailure(shardIndex, searchShardTarget, exception);
                         } finally {
-                            if (context.getRequest().pointInTimeBuilder() == null) {
+                            if (context.isPartOfPointInTime(querySearchRequest.contextId()) == false) {
                                 // the query might not have been executed at all (for example because thread pool rejected
                                 // execution) and the search context that was created in dfs phase might not be released.
                                 // release it again to be in the safe side

+ 1 - 1
server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

@@ -210,7 +210,7 @@ final class FetchSearchPhase extends SearchPhase {
         // or using a PIT and if it has at least one hit that didn't make it to the global topDocs
         if (queryResult.hasSearchContext()
                 && context.getRequest().scroll() == null
-                && context.getRequest().pointInTimeBuilder() == null) {
+                && (context.isPartOfPointInTime(queryResult.getContextId()) == false)) {
             try {
                 SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
                 Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());

+ 10 - 2
server/src/main/java/org/elasticsearch/action/search/SearchContextId.java

@@ -32,6 +32,7 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.internal.AliasFilter;
+import org.elasticsearch.search.internal.ShardSearchContextId;
 import org.elasticsearch.transport.RemoteClusterAware;
 
 import java.io.IOException;
@@ -43,14 +44,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
-public class SearchContextId {
+public final class SearchContextId {
     private final Map<ShardId, SearchContextIdForNode> shards;
     private final Map<String, AliasFilter> aliasFilter;
+    private transient Set<ShardSearchContextId> contextIds;
 
-    private SearchContextId(Map<ShardId, SearchContextIdForNode> shards, Map<String, AliasFilter> aliasFilter) {
+    SearchContextId(Map<ShardId, SearchContextIdForNode> shards, Map<String, AliasFilter> aliasFilter) {
         this.shards = shards;
         this.aliasFilter = aliasFilter;
+        this.contextIds = shards.values().stream().map(SearchContextIdForNode::getSearchContextId).collect(Collectors.toSet());
     }
 
     public Map<ShardId, SearchContextIdForNode> shards() {
@@ -61,6 +65,10 @@ public class SearchContextId {
         return aliasFilter;
     }
 
+    public boolean contains(ShardSearchContextId contextId) {
+        return contextIds.contains(contextId);
+    }
+
     public static String encode(List<SearchPhaseResult> searchPhaseResults, Map<String, AliasFilter> aliasFilter, Version version) {
         final Map<ShardId, SearchContextIdForNode> shards = new HashMap<>();
         for (SearchPhaseResult searchPhaseResult : searchPhaseResults) {

+ 7 - 0
server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java

@@ -58,6 +58,12 @@ interface SearchPhaseContext extends Executor {
      */
     SearchRequest getRequest();
 
+    /**
+     * Checks if the given context id is part of the point in time of this search (if exists).
+     * We should not release search contexts that belong to the point in time during or after searches.
+     */
+    boolean isPartOfPointInTime(ShardSearchContextId contextId);
+
     /**
      * Builds and sends the final search response back to the user.
      *
@@ -108,6 +114,7 @@ interface SearchPhaseContext extends Executor {
     default void sendReleaseSearchContext(ShardSearchContextId contextId,
                                           Transport.Connection connection,
                                           OriginalIndices originalIndices) {
+        assert isPartOfPointInTime(contextId) == false : "Must not release point in time context [" + contextId + "]";
         if (connection != null) {
             getSearchTransport().sendFreeContext(connection, contextId, originalIndices);
         }

+ 5 - 0
server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

@@ -32,6 +32,7 @@ import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -438,4 +439,8 @@ public class SearchTransportService {
         // force the origin to execute the cancellation as a system user
         new OriginSettingClient(client, GetTaskAction.TASKS_ORIGIN).admin().cluster().cancelTasks(req, ActionListener.wrap(() -> {}));
     }
+
+    public NamedWriteableRegistry getNamedWriteableRegistry() {
+        return client.getNamedWriteableRegistry();
+    }
 }

+ 21 - 4
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -38,6 +38,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.GroupShardsIterator;
 import org.elasticsearch.cluster.routing.OperationRouting;
 import org.elasticsearch.cluster.routing.ShardIterator;
+import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
@@ -283,7 +284,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             final SearchContextId searchContext;
             final Map<String, OriginalIndices> remoteClusterIndices;
             if (searchRequest.pointInTimeBuilder() != null) {
-                searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId());
+                searchContext = searchRequest.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
                 remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
             } else {
                 searchContext = null;
@@ -580,7 +581,15 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                 final String clusterAlias = entry.getKey();
                 final SearchContextIdForNode perNode = searchContextId.shards().get(shardId);
                 assert clusterAlias.equals(perNode.getClusterAlias()) : clusterAlias + " != " + perNode.getClusterAlias();
-                final List<String> targetNodes = List.of(perNode.getNode());
+                final List<String> targetNodes = new ArrayList<>(group.getShards().length);
+                targetNodes.add(perNode.getNode());
+                if (perNode.getSearchContextId().getSearcherId() != null) {
+                    for (ShardRouting shard : group.getShards()) {
+                        if (shard.currentNodeId().equals(perNode.getNode()) == false) {
+                            targetNodes.add(shard.currentNodeId());
+                        }
+                    }
+                }
                 SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId, targetNodes,
                     remoteClusterIndices.get(clusterAlias), perNode.getSearchContextId(), searchContextKeepAlive);
                 remoteShardIterators.add(shardIterator);
@@ -914,8 +923,16 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             final SearchContextIdForNode perNode = entry.getValue();
             if (Strings.isEmpty(perNode.getClusterAlias())) {
                 final ShardId shardId = entry.getKey();
-                OperationRouting.getShards(clusterState, shardId);
-                final List<String> targetNodes = List.of(perNode.getNode());
+                final ShardIterator shards = OperationRouting.getShards(clusterState, shardId);
+                final List<String> targetNodes = new ArrayList<>(shards.size());
+                targetNodes.add(perNode.getNode());
+                if (perNode.getSearchContextId().getSearcherId() != null) {
+                    for (ShardRouting shard : shards) {
+                        if (shard.currentNodeId().equals(perNode.getNode()) == false) {
+                            targetNodes.add(shard.currentNodeId());
+                        }
+                    }
+                }
                 iterators.add(new SearchShardIterator(localClusterAlias, shardId, targetNodes, originalIndices,
                     perNode.getSearchContextId(), keepAlive));
             }

+ 1 - 1
server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java

@@ -321,7 +321,7 @@ public class RestSearchAction extends BaseRestHandler {
             indicesOptions.ignoreUnavailable(), indicesOptions.allowNoIndices(), false, false, false,
             true, true, indicesOptions.ignoreThrottled());
         request.indicesOptions(stricterIndicesOptions);
-        final SearchContextId searchContextId = SearchContextId.decode(namedWriteableRegistry, request.pointInTimeBuilder().getId());
+        final SearchContextId searchContextId = request.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
         request.indices(searchContextId.getActualIndices());
     }
 

+ 65 - 28
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -414,11 +414,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
     }
 
     private IndexShard getShard(ShardSearchRequest request) {
-        if (request.readerId() != null) {
-            return findReaderContext(request.readerId(), request).indexShard();
-        } else {
-            return indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
+        final ShardSearchContextId contextId = request.readerId();
+        if (contextId != null) {
+            if (sessionId.equals(contextId.getSessionId())) {
+                final ReaderContext readerContext = activeReaders.get(contextId.getId());
+                if (readerContext != null) {
+                    return readerContext.indexShard();
+                }
+            }
         }
+        return indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
     }
 
     private <T> void runAsync(Executor executor, CheckedSupplier<T, Exception> executable, ActionListener<T> listener) {
@@ -606,18 +611,14 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         }, wrapFailureListener(listener, readerContext, markAsUsed));
     }
 
-    private ReaderContext getReaderContext(ShardSearchContextId id) {
+    private ReaderContext findReaderContext(ShardSearchContextId id, TransportRequest request) throws SearchContextMissingException {
         if (id.getSessionId().isEmpty()) {
             throw new IllegalArgumentException("Session id must be specified");
         }
         if (sessionId.equals(id.getSessionId()) == false) {
             throw new SearchContextMissingException(id);
         }
-        return activeReaders.get(id.getId());
-    }
-
-    private ReaderContext findReaderContext(ShardSearchContextId id, TransportRequest request) throws SearchContextMissingException {
-        final ReaderContext reader = getReaderContext(id);
+        final ReaderContext reader = activeReaders.get(id.getId());
         if (reader == null) {
             throw new SearchContextMissingException(id);
         }
@@ -633,18 +634,32 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
     final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean keepStatesInContext) {
         if (request.readerId() != null) {
             assert keepStatesInContext == false;
-            return findReaderContext(request.readerId(), request);
+            try {
+                return findReaderContext(request.readerId(), request);
+            } catch (SearchContextMissingException e) {
+                final String searcherId = request.readerId().getSearcherId();
+                if (searcherId == null) {
+                    throw e;
+                }
+                final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
+                final IndexShard shard = indexService.getShard(request.shardId().id());
+                final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier();
+                if (searcherId.equals(searcherSupplier.getSearcherId()) == false) {
+                    searcherSupplier.close();
+                    throw e;
+                }
+                return createAndPutReaderContext(request, indexService, shard, searcherSupplier, false);
+            }
+        } else {
+            final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
+            final IndexShard shard = indexService.getShard(request.shardId().id());
+            final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier();
+            return createAndPutReaderContext(request, indexService, shard, searcherSupplier, keepStatesInContext);
         }
-        IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
-        IndexShard shard = indexService.getShard(request.shardId().id());
-        Engine.SearcherSupplier reader = shard.acquireSearcherSupplier();
-        return createAndPutReaderContext(request, indexService, shard, reader, keepStatesInContext);
     }
 
     final ReaderContext createAndPutReaderContext(ShardSearchRequest request, IndexService indexService, IndexShard shard,
                                                   Engine.SearcherSupplier reader, boolean keepStatesInContext) {
-        assert request.readerId() == null;
-        assert request.keepAlive() == null;
         ReaderContext readerContext = null;
         Releasable decreaseScrollContexts = null;
         try {
@@ -706,7 +721,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
             ReaderContext readerContext = null;
             try {
                 searcherSupplier = shard.acquireSearcherSupplier();
-                final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet());
+                final ShardSearchContextId id =
+                    new ShardSearchContextId(sessionId, idGenerator.incrementAndGet(), searcherSupplier.getSearcherId());
                 readerContext = new ReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false);
                 final ReaderContext finalReaderContext = readerContext;
                 searcherSupplier = null; // transfer ownership to reader context
@@ -801,7 +817,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
     }
 
     public boolean freeReaderContext(ShardSearchContextId contextId) {
-        if (getReaderContext(contextId) != null) {
+        if (sessionId.equals(contextId.getSessionId())) {
             try (ReaderContext context = removeReaderContext(contextId.getId())) {
                 return context != null;
             }
@@ -1176,23 +1192,42 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
 
     private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefreshPending) throws IOException {
         assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
-        final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null;
-        final Releasable markAsUsed = readerContext != null ? readerContext.markAsUsed(getKeepAlive(request)) : null;
-        try (markAsUsed) {
-            final IndexService indexService;
-            final Engine.Searcher canMatchSearcher;
+        Releasable releasable = null;
+        try {
+            IndexService indexService;
             final boolean hasRefreshPending;
-            if (readerContext != null) {
-                indexService = readerContext.indexService();
-                canMatchSearcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
+            final Engine.Searcher canMatchSearcher;
+            if (request.readerId() != null) {
                 hasRefreshPending = false;
+                ReaderContext readerContext;
+                Engine.Searcher searcher;
+                try {
+                    readerContext = findReaderContext(request.readerId(), request);
+                    releasable = readerContext.markAsUsed(getKeepAlive(request));
+                    indexService = readerContext.indexService();
+                    searcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
+                } catch (SearchContextMissingException e) {
+                    final String searcherId = request.readerId().getSearcherId();
+                    if (searcherId == null) {
+                        throw e;
+                    }
+                    indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
+                    IndexShard indexShard = indexService.getShard(request.shardId().getId());
+                    final Engine.SearcherSupplier searcherSupplier = indexShard.acquireSearcherSupplier();
+                    if (searcherId.equals(searcherSupplier.getSearcherId()) == false) {
+                        searcherSupplier.close();
+                        throw e;
+                    }
+                    releasable = searcherSupplier;
+                    searcher = searcherSupplier.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
+                }
+                canMatchSearcher = searcher;
             } else {
                 indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
                 IndexShard indexShard = indexService.getShard(request.shardId().getId());
                 hasRefreshPending = indexShard.hasRefreshPending() && checkRefreshPending;
                 canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
             }
-
             try (canMatchSearcher) {
                 QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), 0,
                     canMatchSearcher, request::nowInMillis, request.getClusterAlias(), request.getRuntimeMappings());
@@ -1206,6 +1241,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
                 }
                 return new CanMatchResponse(canMatch || hasRefreshPending, minMax);
             }
+        } finally {
+            Releasables.close(releasable);
         }
     }
 

+ 28 - 15
server/src/main/java/org/elasticsearch/search/builder/PointInTimeBuilder.java

@@ -19,8 +19,10 @@
 
 package org.elasticsearch.search.builder;
 
+import org.elasticsearch.action.search.SearchContextId;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -44,39 +46,40 @@ public final class PointInTimeBuilder implements Writeable, ToXContentObject {
 
     static {
         PARSER = new ObjectParser<>(SearchSourceBuilder.POINT_IN_TIME.getPreferredName(), XContentParams::new);
-        PARSER.declareString((params, id) -> params.id = id, ID_FIELD);
+        PARSER.declareString((params, id) -> params.encodedId = id, ID_FIELD);
         PARSER.declareField((params, keepAlive) -> params.keepAlive = keepAlive,
             (p, c) -> TimeValue.parseTimeValue(p.text(), KEEP_ALIVE_FIELD.getPreferredName()),
             KEEP_ALIVE_FIELD, ObjectParser.ValueType.STRING);
     }
 
     private static final class XContentParams {
-        private String id;
+        private String encodedId;
         private TimeValue keepAlive;
     }
 
-    private final String id;
+    private final String encodedId;
+    private transient SearchContextId searchContextId; // lazily decoded from the encodedId
     private TimeValue keepAlive;
 
-    public PointInTimeBuilder(String id) {
-        this.id = Objects.requireNonNull(id);
+    public PointInTimeBuilder(String encodedId) {
+        this.encodedId = Objects.requireNonNull(encodedId);
     }
 
     public PointInTimeBuilder(StreamInput in) throws IOException {
-        id = in.readString();
+        encodedId = in.readString();
         keepAlive = in.readOptionalTimeValue();
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        out.writeString(id);
+        out.writeString(encodedId);
         out.writeOptionalTimeValue(keepAlive);
     }
 
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject(SearchSourceBuilder.POINT_IN_TIME.getPreferredName());
-        builder.field(ID_FIELD.getPreferredName(), id);
+        builder.field(ID_FIELD.getPreferredName(), encodedId);
         if (keepAlive != null) {
             builder.field(KEEP_ALIVE_FIELD.getPreferredName(), keepAlive);
         }
@@ -86,17 +89,27 @@ public final class PointInTimeBuilder implements Writeable, ToXContentObject {
 
     public static PointInTimeBuilder fromXContent(XContentParser parser) throws IOException {
         final XContentParams params = PARSER.parse(parser, null);
-        if (params.id == null) {
+        if (params.encodedId == null) {
             throw new IllegalArgumentException("point int time id is not provided");
         }
-        return new PointInTimeBuilder(params.id).setKeepAlive(params.keepAlive);
+        return new PointInTimeBuilder(params.encodedId).setKeepAlive(params.keepAlive);
     }
 
     /**
-     * Returns the id of this point in time
+     * Returns the encoded id of this point in time
      */
-    public String getId() {
-        return id;
+    public String getEncodedId() {
+        return encodedId;
+    }
+
+    /**
+     * Returns the search context of this point in time from its encoded id.
+     */
+    public SearchContextId getSearchContextId(NamedWriteableRegistry namedWriteableRegistry) {
+        if (searchContextId == null) {
+            searchContextId = SearchContextId.decode(namedWriteableRegistry, encodedId);
+        }
+        return searchContextId;
     }
 
     /**
@@ -118,11 +131,11 @@ public final class PointInTimeBuilder implements Writeable, ToXContentObject {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         final PointInTimeBuilder that = (PointInTimeBuilder) o;
-        return Objects.equals(id, that.id) && Objects.equals(keepAlive, that.keepAlive);
+        return Objects.equals(encodedId, that.encodedId) && Objects.equals(keepAlive, that.keepAlive);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, keepAlive);
+        return Objects.hash(encodedId, keepAlive);
     }
 }

+ 1 - 0
server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java

@@ -40,6 +40,7 @@ public class LegacyReaderContext extends ReaderContext {
         super(id, indexService, indexShard, reader, keepAliveInMillis, false);
         assert shardSearchRequest.readerId() == null;
         assert shardSearchRequest.keepAlive() == null;
+        assert id.getSearcherId() == null : "Legacy reader context must not have searcher id";
         this.shardSearchRequest = Objects.requireNonNull(shardSearchRequest);
         if (shardSearchRequest.scroll() != null) {
             // Search scroll requests are special, they don't hold indices names so we have

+ 22 - 3
server/src/main/java/org/elasticsearch/search/internal/ShardSearchContextId.java

@@ -30,10 +30,17 @@ import java.util.Objects;
 public final class ShardSearchContextId implements Writeable {
     private final String sessionId;
     private final long id;
+    private final String searcherId;
 
+    // TODO: Remove this constructor
     public ShardSearchContextId(String sessionId, long id) {
+        this(sessionId, id, null);
+    }
+
+    public ShardSearchContextId(String sessionId, long id, String searcherId) {
         this.sessionId = Objects.requireNonNull(sessionId);
         this.id = id;
+        this.searcherId = searcherId;
     }
 
     public ShardSearchContextId(StreamInput in) throws IOException {
@@ -43,6 +50,11 @@ public final class ShardSearchContextId implements Writeable {
         } else {
             this.sessionId = "";
         }
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            this.searcherId = in.readOptionalString();
+        } else {
+            this.searcherId = null;
+        }
     }
 
     @Override
@@ -51,6 +63,9 @@ public final class ShardSearchContextId implements Writeable {
         if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
             out.writeString(sessionId);
         }
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeOptionalString(searcherId);
+        }
     }
 
     public String getSessionId() {
@@ -61,21 +76,25 @@ public final class ShardSearchContextId implements Writeable {
         return id;
     }
 
+    public String getSearcherId() {
+        return searcherId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         ShardSearchContextId other = (ShardSearchContextId) o;
-        return id == other.id && sessionId.equals(other.sessionId);
+        return id == other.id && sessionId.equals(other.sessionId) && Objects.equals(searcherId, other.searcherId);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(sessionId, id);
+        return Objects.hash(sessionId, id, searcherId);
     }
 
     @Override
     public String toString() {
-        return "[" + sessionId + "][" + id + "]";
+        return "[" + sessionId + "][" + id + "] searcherId [" + searcherId + "]";
     }
 }

+ 5 - 0
server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java

@@ -150,4 +150,9 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
     public void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection, OriginalIndices originalIndices) {
         releasedSearchContexts.add(contextId);
     }
+
+    @Override
+    public boolean isPartOfPointInTime(ShardSearchContextId contextId) {
+        return false;
+    }
 }

+ 63 - 0
server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.action.OriginalIndicesTests;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -36,13 +37,16 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.GroupShardsIterator;
 import org.elasticsearch.cluster.routing.GroupShardsIteratorTests;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.TestShardRouting;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.query.InnerHitBuilder;
 import org.elasticsearch.index.query.MatchAllQueryBuilder;
@@ -61,6 +65,7 @@ import org.elasticsearch.search.collapse.CollapseBuilder;
 import org.elasticsearch.search.internal.AliasFilter;
 import org.elasticsearch.search.internal.InternalSearchResponse;
 import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.internal.ShardSearchContextId;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.test.ESTestCase;
@@ -82,6 +87,7 @@ import org.elasticsearch.transport.TransportService;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -94,12 +100,16 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.test.InternalAggregationTestCase.emptyReduceContextBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch;
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
 
 public class TransportSearchActionTests extends ESTestCase {
 
@@ -936,4 +946,57 @@ public class TransportSearchActionTests extends ESTestCase {
                 indices, randomIntBetween(127, 10000)));
         }
     }
+
+    public void testLocalShardIteratorFromPointInTime() {
+        final int numberOfShards = randomIntBetween(1, 5);
+        final int numberOfReplicas = randomIntBetween(0, 2);
+        final String[] indices = {"test-1", "test-2"};
+        final ClusterState clusterState =
+            ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indices, numberOfShards, numberOfReplicas);
+        final IndexMetadata indexMetadata = clusterState.metadata().index("test-1");
+        Map<ShardId, SearchContextIdForNode> contexts = new HashMap<>();
+        Set<ShardId> relocatedContexts = new HashSet<>();
+        for (int shardId = 0; shardId < numberOfShards; shardId++) {
+            final String targetNode;
+            if (randomBoolean()) {
+                final IndexRoutingTable routingTable = clusterState.routingTable().index(indexMetadata.getIndex());
+                targetNode = randomFrom(routingTable.shard(shardId).assignedShards()).currentNodeId();
+            } else {
+                // relocated or no longer assigned
+                relocatedContexts.add(new ShardId(indexMetadata.getIndex(), shardId));
+                targetNode = UUIDs.randomBase64UUID();
+            }
+            contexts.put(new ShardId(indexMetadata.getIndex(), shardId),
+                new SearchContextIdForNode(null, targetNode,
+                    new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong(), null)));
+        }
+        TimeValue keepAlive = randomBoolean() ? null : TimeValue.timeValueSeconds(between(30, 3600));
+        final List<SearchShardIterator> shardIterators = TransportSearchAction.getLocalLocalShardsIteratorFromPointInTime(
+            clusterState,
+            OriginalIndices.NONE,
+            null,
+            new SearchContextId(contexts, Map.of()),
+            keepAlive
+        );
+        shardIterators.sort(Comparator.comparing(SearchShardIterator::shardId));
+        assertThat(shardIterators, hasSize(numberOfShards));
+        for (int id = 0; id < numberOfShards; id++) {
+            final ShardId shardId = new ShardId(indexMetadata.getIndex(), id);
+            final SearchShardIterator shardIterator = shardIterators.get(id);
+            final SearchContextIdForNode context = contexts.get(shardId);
+            if (context.getSearchContextId().getSearcherId() == null) {
+                assertThat(shardIterator.getTargetNodeIds(), hasSize(1));
+            } else {
+                final List<String> targetNodes = clusterState.routingTable().index(indexMetadata.getIndex()).shard(id).assignedShards()
+                    .stream().map(ShardRouting::currentNodeId).collect(Collectors.toList());
+                if (relocatedContexts.contains(shardId)) {
+                    targetNodes.add(context.getNode());
+                }
+                assertThat(shardIterator.getTargetNodeIds(), containsInAnyOrder(targetNodes.toArray(new String[0])));
+            }
+            assertThat(shardIterator.getTargetNodeIds().get(0), equalTo(context.getNode()));
+            assertThat(shardIterator.getSearchContextId(), equalTo(context.getSearchContextId()));
+            assertThat(shardIterator.getSearchContextKeepAlive(), equalTo(keepAlive));
+        }
+    }
 }

+ 1 - 2
server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

@@ -1126,8 +1126,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
                 assertThat(searchService.getActiveContexts(), equalTo(contextIds.size()));
                 while (contextIds.isEmpty() == false) {
                     final ShardSearchContextId contextId = randomFrom(contextIds);
-                    expectThrows(SearchContextMissingException.class,
-                        () -> searchService.freeReaderContext(new ShardSearchContextId(UUIDs.randomBase64UUID(), contextId.getId())));
+                    assertFalse(searchService.freeReaderContext(new ShardSearchContextId(UUIDs.randomBase64UUID(), contextId.getId())));
                     assertThat(searchService.getActiveContexts(), equalTo(contextIds.size()));
                     if (randomBoolean()) {
                         assertTrue(searchService.freeReaderContext(contextId));

+ 66 - 0
x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java

@@ -7,34 +7,48 @@
 package org.elasticsearch.index.engine;
 
 import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.action.support.ActiveShardCount;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
 import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.query.RangeQueryBuilder;
 import org.elasticsearch.index.shard.IndexLongFieldRange;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.protocol.xpack.frozen.FreezeRequest;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.builder.PointInTimeBuilder;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
 import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
+import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
+import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
+import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
+import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
 import org.elasticsearch.xpack.frozen.FrozenIndices;
 import org.joda.time.Instant;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.sameInstance;
@@ -170,4 +184,56 @@ public class FrozenIndexIT extends ESIntegTestCase {
         }
     }
 
+    public void testRetryPointInTime() throws Exception {
+        internalCluster().ensureAtLeastNumDataNodes(1);
+        final List<String> dataNodes =
+            StreamSupport.stream(internalCluster().clusterService().state().nodes().getDataNodes().spliterator(), false)
+                .map(e -> e.value.getName())
+                .collect(Collectors.toList());
+        final String assignedNode = randomFrom(dataNodes);
+        final String indexName = "test";
+        assertAcked(
+            client().admin().indices()
+                .prepareCreate(indexName)
+                .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
+                    .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+                    .put("index.routing.allocation.require._name", assignedNode)
+                    .build())
+                .setMapping("{\"properties\":{\"created_date\":{\"type\": \"date\", \"format\": \"yyyy-MM-dd\"}}}"));
+        int numDocs = randomIntBetween(1, 100);
+        for (int i = 0; i < numDocs; i++) {
+            client().prepareIndex(indexName).setSource("created_date", "2011-02-02").get();
+        }
+        assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest(indexName)).actionGet());
+        final String pitId = client().execute(OpenPointInTimeAction.INSTANCE,
+            new OpenPointInTimeRequest(new String[]{indexName}, IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED,
+                TimeValue.timeValueMinutes(2), null, null)).actionGet().getSearchContextId();
+        try {
+            SearchResponse resp = client().prepareSearch()
+                .setIndices(indexName)
+                .setPreference(null)
+                .setPointInTime(new PointInTimeBuilder(pitId))
+                .get();
+            assertNoFailures(resp);
+            assertThat(resp.pointInTimeId(), equalTo(pitId));
+            assertHitCount(resp, numDocs);
+            internalCluster().restartNode(assignedNode);
+            ensureGreen(indexName);
+            resp = client().prepareSearch()
+                .setIndices(indexName)
+                .setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12"))
+                .setSearchType(SearchType.QUERY_THEN_FETCH)
+                .setPreference(null)
+                .setPreFilterShardSize(between(1, 10))
+                .setAllowPartialSearchResults(true)
+                .setPointInTime(new PointInTimeBuilder(pitId))
+                .get();
+            assertNoFailures(resp);
+            assertThat(resp.pointInTimeId(), equalTo(pitId));
+            assertHitCount(resp, numDocs);
+        } finally {
+            assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest(indexName).setFreeze(false)).actionGet());
+            client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet();
+        }
+    }
 }

+ 24 - 0
x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java

@@ -16,6 +16,7 @@ import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -39,10 +40,12 @@ import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.protocol.xpack.frozen.FreezeRequest;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchContextMissingException;
 import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.builder.PointInTimeBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.internal.AliasFilter;
+import org.elasticsearch.search.internal.ShardSearchContextId;
 import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
@@ -342,6 +345,27 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
 
             IndicesStatsResponse response = client().admin().indices().prepareStats("index").clear().setRefresh(true).get();
             assertEquals(0, response.getTotal().refresh.getTotal());
+
+            // Retry with point in time
+            PlainActionFuture<ShardSearchContextId> openContextFuture = new PlainActionFuture<>();
+            searchService.openReaderContext(shard.shardId(), TimeValue.timeValueSeconds(60), openContextFuture);
+            final ShardSearchContextId contextId = openContextFuture.actionGet(TimeValue.timeValueSeconds(60));
+            assertNotNull(contextId.getSearcherId());
+            sourceBuilder.query(QueryBuilders.rangeQuery("field").gt("2010-01-06T02:00").lt("2010-01-07T02:00"));
+            assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1,
+                new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, contextId, null)).canMatch());
+
+            assertTrue(searchService.freeReaderContext(contextId));
+            sourceBuilder.query(QueryBuilders.rangeQuery("field").gt("2010-01-06T02:00").lt("2010-01-07T02:00"));
+            assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1,
+                new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, contextId, null)).canMatch());
+
+            expectThrows(SearchContextMissingException.class, () -> {
+                ShardSearchContextId withoutCommitId = new ShardSearchContextId(contextId.getSessionId(), contextId.getId(), null);
+                sourceBuilder.query(QueryBuilders.rangeQuery("field").gt("2010-01-06T02:00").lt("2010-01-07T02:00"));
+                assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1,
+                    new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, withoutCommitId, null)).canMatch());
+            });
         }
     }
 

+ 89 - 0
x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java

@@ -6,13 +6,23 @@
 package org.elasticsearch.xpack.searchablesnapshots;
 
 import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.query.RangeQueryBuilder;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.search.builder.PointInTimeBuilder;
 import org.elasticsearch.snapshots.SnapshotId;
+import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
+import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
+import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
+import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -20,6 +30,8 @@ import java.util.Locale;
 import java.util.Set;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
 import static org.hamcrest.Matchers.equalTo;
 
 public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase {
@@ -92,4 +104,81 @@ public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase
             }
         }
     }
+
+    public void testRetryPointInTime() throws Exception {
+        final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        assertAcked(
+            client().admin()
+                .indices()
+                .prepareCreate(indexName)
+                .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)).build())
+                .setMapping("{\"properties\":{\"created_date\":{\"type\": \"date\", \"format\": \"yyyy-MM-dd\"}}}")
+        );
+        final List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
+        final int docCount = between(0, 100);
+        for (int i = 0; i < docCount; i++) {
+            indexRequestBuilders.add(client().prepareIndex(indexName).setSource("created_date", "2011-02-02"));
+        }
+        indexRandom(true, false, indexRequestBuilders);
+        assertThat(
+            client().admin().indices().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(),
+            equalTo(0)
+        );
+        refresh(indexName);
+        forceMerge();
+
+        final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        createRepository(repositoryName, "fs");
+
+        final SnapshotId snapshotOne = createSnapshot(repositoryName, "snapshot-1", List.of(indexName)).snapshotId();
+        assertAcked(client().admin().indices().prepareDelete(indexName));
+
+        final int numberOfReplicas = between(0, 2);
+        final Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build();
+        internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1);
+
+        mountSnapshot(repositoryName, snapshotOne.getName(), indexName, indexName, indexSettings);
+        ensureGreen(indexName);
+
+        final String pitId = client().execute(
+            OpenPointInTimeAction.INSTANCE,
+            new OpenPointInTimeRequest(
+                new String[] { indexName },
+                IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED,
+                TimeValue.timeValueMinutes(2),
+                null,
+                null
+            )
+        ).actionGet().getSearchContextId();
+        try {
+            SearchResponse resp = client().prepareSearch()
+                .setIndices(indexName)
+                .setPreference(null)
+                .setPointInTime(new PointInTimeBuilder(pitId))
+                .get();
+            assertNoFailures(resp);
+            assertThat(resp.pointInTimeId(), equalTo(pitId));
+            assertHitCount(resp, docCount);
+
+            final Set<String> allocatedNodes = internalCluster().nodesInclude(indexName);
+            for (String allocatedNode : allocatedNodes) {
+                internalCluster().restartNode(allocatedNode);
+            }
+            ensureGreen(indexName);
+            resp = client().prepareSearch()
+                .setIndices(indexName)
+                .setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12"))
+                .setSearchType(SearchType.QUERY_THEN_FETCH)
+                .setPreference(null)
+                .setPreFilterShardSize(between(1, 10))
+                .setAllowPartialSearchResults(true)
+                .setPointInTime(new PointInTimeBuilder(pitId))
+                .get();
+            assertNoFailures(resp);
+            assertThat(resp.pointInTimeId(), equalTo(pitId));
+            assertHitCount(resp, docCount);
+        } finally {
+            client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet();
+        }
+    }
 }