Browse Source

Support point in time in async_search (#61560)

This commit integrates point in time into async search and 
ensures that it works correctly with security enabled.

Relates #61062
Nhat Nguyen 5 years ago
parent
commit
71afd226af
17 changed files with 343 additions and 69 deletions
  1. 2 1
      modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestSearchTemplateAction.java
  2. 3 2
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java
  3. 3 2
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java
  4. 3 2
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java
  5. 2 1
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java
  6. 3 2
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java
  7. 6 3
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java
  8. 0 6
      server/src/main/java/org/elasticsearch/action/search/SearchRequest.java
  9. 25 6
      server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java
  10. 0 22
      server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java
  11. 174 8
      x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java
  12. 10 9
      x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java
  13. 1 1
      x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java
  14. 28 2
      x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java
  15. 3 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/RestOpenPointInTimeAction.java
  16. 2 1
      x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestRollupSearchAction.java
  17. 78 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/20-with-poin-in-time.yml

+ 2 - 1
modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestSearchTemplateAction.java

@@ -64,7 +64,8 @@ public class RestSearchTemplateAction extends BaseRestHandler {
     public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
         // Creates the search request with all required params
         SearchRequest searchRequest = new SearchRequest();
-        RestSearchAction.parseSearchRequest(searchRequest, request, null, size -> searchRequest.source().size(size));
+        RestSearchAction.parseSearchRequest(
+            searchRequest, request, null, client.getNamedWriteableRegistry(), size -> searchRequest.source().size(size));
 
         // Creates the search template request
         SearchTemplateRequest searchTemplateRequest;

+ 3 - 2
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java

@@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.BytesRestResponse;
@@ -49,7 +50,7 @@ public abstract class AbstractBaseReindexRestHandler<
     protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient client,
                                                    boolean includeCreated, boolean includeUpdated) throws IOException {
         // Build the internal request
-        Request internal = setCommonOptions(request, buildRequest(request));
+        Request internal = setCommonOptions(request, buildRequest(request, client.getNamedWriteableRegistry()));
 
         // Executes the request and waits for completion
         if (request.paramAsBoolean("wait_for_completion", true)) {
@@ -77,7 +78,7 @@ public abstract class AbstractBaseReindexRestHandler<
     /**
      * Build the Request based on the RestRequest.
      */
-    protected abstract Request buildRequest(RestRequest request) throws IOException;
+    protected abstract Request buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException;
 
     /**
      * Sets common options of {@link AbstractBulkByScrollRequest} requests.

+ 3 - 2
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java

@@ -22,6 +22,7 @@ package org.elasticsearch.index.reindex;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
@@ -43,7 +44,7 @@ public abstract class AbstractBulkByQueryRestHandler<
         super(action);
     }
 
-    protected void parseInternalRequest(Request internal, RestRequest restRequest,
+    protected void parseInternalRequest(Request internal, RestRequest restRequest, NamedWriteableRegistry namedWriteableRegistry,
                                         Map<String, Consumer<Object>> bodyConsumers) throws IOException {
         assert internal != null : "Request should not be null";
         assert restRequest != null : "RestRequest should not be null";
@@ -51,7 +52,7 @@ public abstract class AbstractBulkByQueryRestHandler<
         SearchRequest searchRequest = internal.getSearchRequest();
 
         try (XContentParser parser = extractRequestSpecificFields(restRequest, bodyConsumers)) {
-            RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, size -> failOnSizeSpecified());
+            RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, namedWriteableRegistry, size -> failOnSizeSpecified());
         }
 
         searchRequest.source().size(restRequest.paramAsInt("scroll_size", searchRequest.source().size()));

+ 3 - 2
modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.rest.RestRequest;
 
 import java.io.IOException;
@@ -52,7 +53,7 @@ public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler<Dele
     }
 
     @Override
-    protected DeleteByQueryRequest buildRequest(RestRequest request) throws IOException {
+    protected DeleteByQueryRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException {
         /*
          * Passing the search request through DeleteByQueryRequest first allows
          * it to set its own defaults which differ from SearchRequest's
@@ -64,7 +65,7 @@ public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler<Dele
         consumers.put("conflicts", o -> internal.setConflicts((String) o));
         consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));
 
-        parseInternalRequest(internal, request, consumers);
+        parseInternalRequest(internal, request, namedWriteableRegistry, consumers);
 
         return internal;
     }

+ 2 - 1
modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java

@@ -21,6 +21,7 @@ package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.rest.RestRequest;
 
@@ -55,7 +56,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
     }
 
     @Override
-    protected ReindexRequest buildRequest(RestRequest request) throws IOException {
+    protected ReindexRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException {
         if (request.hasParam("pipeline")) {
             throw new IllegalArgumentException("_reindex doesn't support [pipeline] as a query parameter. "
                     + "Specify it in the [dest] object instead.");

+ 3 - 2
modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.script.Script;
 
@@ -53,7 +54,7 @@ public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler<Upda
     }
 
     @Override
-    protected UpdateByQueryRequest buildRequest(RestRequest request) throws IOException {
+    protected UpdateByQueryRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException {
         /*
          * Passing the search request through UpdateByQueryRequest first allows
          * it to set its own defaults which differ from SearchRequest's
@@ -66,7 +67,7 @@ public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler<Upda
         consumers.put("script", o -> internal.setScript(Script.parse(o)));
         consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));
 
-        parseInternalRequest(internal, request, consumers);
+        parseInternalRequest(internal, request, namedWriteableRegistry, consumers);
 
         internal.setPipeline(request.param("pipeline"));
         return internal;

+ 6 - 3
modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java

@@ -21,6 +21,7 @@ package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
@@ -29,6 +30,7 @@ import org.elasticsearch.test.rest.RestActionTestCase;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.util.Collections;
 
 import static java.util.Collections.singletonMap;
 
@@ -59,7 +61,8 @@ public class RestReindexActionTests extends RestActionTestCase {
             request.withContent(BytesReference.bytes(body), body.contentType());
         }
         request.withParams(singletonMap("pipeline", "doesn't matter"));
-        Exception e = expectThrows(IllegalArgumentException.class, () -> action.buildRequest(request.build()));
+        Exception e = expectThrows(IllegalArgumentException.class, () ->
+            action.buildRequest(request.build(), new NamedWriteableRegistry(Collections.emptyList())));
 
         assertEquals("_reindex doesn't support [pipeline] as a query parameter. Specify it in the [dest] object instead.", e.getMessage());
     }
@@ -68,14 +71,14 @@ public class RestReindexActionTests extends RestActionTestCase {
         {
             FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry());
             requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON);
-            ReindexRequest request = action.buildRequest(requestBuilder.build());
+            ReindexRequest request = action.buildRequest(requestBuilder.build(), new NamedWriteableRegistry(Collections.emptyList()));
             assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT, request.getScrollTime());
         }
         {
             FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry());
             requestBuilder.withParams(singletonMap("scroll", "10m"));
             requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON);
-            ReindexRequest request = action.buildRequest(requestBuilder.build());
+            ReindexRequest request = action.buildRequest(requestBuilder.build(), new NamedWriteableRegistry(Collections.emptyList()));
             assertEquals("10m", request.getScrollTime().toString());
         }
     }

+ 0 - 6
server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

@@ -279,12 +279,6 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
             if (scroll) {
                 validationException = addValidationError("using [point in time] is not allowed in a scroll context", validationException);
             }
-            if (routing() != null) {
-                validationException = addValidationError("[routing] cannot be used with point in time", validationException);
-            }
-            if (preference() != null) {
-                validationException = addValidationError("[preference] cannot be used with point in time", validationException);
-            }
         }
         return validationException;
     }

+ 25 - 6
server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.rest.action.search;
 
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchContextId;
 import org.elasticsearch.action.search.SearchRequest;
@@ -51,6 +53,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.function.IntConsumer;
 
+import static org.elasticsearch.action.ValidateActions.addValidationError;
 import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
 import static org.elasticsearch.rest.RestRequest.Method.GET;
 import static org.elasticsearch.rest.RestRequest.Method.POST;
@@ -100,12 +103,8 @@ public class RestSearchAction extends BaseRestHandler {
          * company.
          */
         IntConsumer setSize = size -> searchRequest.source().size(size);
-        request.withContentOrSourceParamParserOrNull(parser -> {
-            parseSearchRequest(searchRequest, request, parser, setSize);
-            if (searchRequest.pointInTimeBuilder() != null) {
-                preparePointInTime(searchRequest, client.getNamedWriteableRegistry());
-            }
-        });
+        request.withContentOrSourceParamParserOrNull(parser ->
+            parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize));
 
         return channel -> {
             RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
@@ -122,6 +121,7 @@ public class RestSearchAction extends BaseRestHandler {
      */
     public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
                                           XContentParser requestContentParser,
+                                          NamedWriteableRegistry namedWriteableRegistry,
                                           IntConsumer setSize) throws IOException {
 
         if (searchRequest.source() == null) {
@@ -175,6 +175,10 @@ public class RestSearchAction extends BaseRestHandler {
         searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));
 
         checkRestTotalHits(request, searchRequest);
+
+        if (searchRequest.pointInTimeBuilder() != null) {
+            preparePointInTime(searchRequest, namedWriteableRegistry);
+        }
     }
 
     /**
@@ -291,6 +295,21 @@ public class RestSearchAction extends BaseRestHandler {
 
     static void preparePointInTime(SearchRequest request, NamedWriteableRegistry namedWriteableRegistry) {
         assert request.pointInTimeBuilder() != null;
+        ActionRequestValidationException validationException = null;
+        if (request.indices().length > 0) {
+            validationException = addValidationError("[indices] cannot be used with point in time", validationException);
+        }
+        if (request.indicesOptions() != SearchRequest.DEFAULT_INDICES_OPTIONS) {
+            validationException = addValidationError("[indicesOptions] cannot be used with point in time", validationException);
+        }
+        if (request.routing() != null) {
+            validationException = addValidationError("[routing] cannot be used with point in time", validationException);
+        }
+        if (request.preference() != null) {
+            validationException = addValidationError("[preference] cannot be used with point in time", validationException);
+        }
+        ExceptionsHelper.reThrowIfNotNull(validationException);
+
         final IndicesOptions indicesOptions = request.indicesOptions();
         final IndicesOptions stricterIndicesOptions = IndicesOptions.fromOptions(
             indicesOptions.ignoreUnavailable(), indicesOptions.allowNoIndices(), false, false, false,

+ 0 - 22
server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java

@@ -175,28 +175,6 @@ public class SearchRequestTests extends AbstractSearchTestCase {
             assertEquals(1, validationErrors.validationErrors().size());
             assertEquals("using [point in time] is not allowed in a scroll context", validationErrors.validationErrors().get(0));
         }
-        {
-            // Reader context with preference
-            SearchRequest searchRequest = new SearchRequest()
-                .source(new SearchSourceBuilder().
-                    pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder("id", TimeValue.timeValueMillis(between(1, 10)))))
-                .preference("test");
-            ActionRequestValidationException validationErrors = searchRequest.validate();
-            assertNotNull(validationErrors);
-            assertEquals(1, validationErrors.validationErrors().size());
-            assertEquals("[preference] cannot be used with point in time", validationErrors.validationErrors().get(0));
-        }
-        {
-            // Reader context with routing
-            SearchRequest searchRequest = new SearchRequest()
-                .source(new SearchSourceBuilder()
-                    .pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder("id", TimeValue.timeValueMillis(between(1, 10)))))
-                .routing("test");
-            ActionRequestValidationException validationErrors = searchRequest.validate();
-            assertNotNull(validationErrors);
-            assertEquals(1, validationErrors.validationErrors().size());
-            assertEquals("[routing] cannot be used with point in time", validationErrors.validationErrors().get(0));
-        }
     }
 
     public void testCopyConstructor() throws IOException {

+ 174 - 8
x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java

@@ -28,6 +28,7 @@ import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.test.rest.ESRestTestCase;
 import org.elasticsearch.xpack.core.async.AsyncExecutionId;
 import org.hamcrest.CustomMatcher;
+import org.hamcrest.Matcher;
 import org.junit.Before;
 
 import java.io.IOException;
@@ -39,6 +40,7 @@ import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
 import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER;
 import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
 import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
+import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -75,14 +77,7 @@ public class AsyncSearchSecurityIT extends ESRestTestCase {
     public void testWithDlsAndFls() throws Exception {
         Response submitResp = submitAsyncSearch("*", "*", TimeValue.timeValueSeconds(10), "user_dls");
         assertOK(submitResp);
-        String id = extractResponseId(submitResp);
-        Response getResp = getAsyncSearch(id, "user_dls");
-        AsyncSearchResponse searchResponse = AsyncSearchResponse.fromXContent(XContentHelper.createParser(NamedXContentRegistry.EMPTY,
-                LoggingDeprecationHandler.INSTANCE,
-                new BytesArray(EntityUtils.toByteArray(getResp.getEntity())),
-                XContentType.JSON));
-        SearchHit[] hits = searchResponse.getSearchResponse().getHits().getHits();
-
+        SearchHit[] hits = getSearchHits(extractResponseId(submitResp), "user_dls");
         assertThat(hits, arrayContainingInAnyOrder(
                 new CustomMatcher<SearchHit>("\"index\" doc 1 matcher") {
                     @Override
@@ -151,6 +146,139 @@ public class AsyncSearchSecurityIT extends ESRestTestCase {
         assertThat(exc.getMessage(), containsString("unauthorized"));
     }
 
+    private SearchHit[] getSearchHits(String asyncId, String user) throws IOException {
+        final Response resp = getAsyncSearch(asyncId, user);
+        assertOK(resp);
+        AsyncSearchResponse searchResponse = AsyncSearchResponse.fromXContent(XContentHelper.createParser(NamedXContentRegistry.EMPTY,
+            LoggingDeprecationHandler.INSTANCE,
+            new BytesArray(EntityUtils.toByteArray(resp.getEntity())),
+            XContentType.JSON));
+        return searchResponse.getSearchResponse().getHits().getHits();
+    }
+
+    public void testAuthorizationOfPointInTime() throws Exception {
+        String authorizedUser = randomFrom("user1", "user2");
+        final Matcher<SearchHit> hitMatcher = new CustomMatcher<>("hit") {
+            @Override
+            public boolean matches(Object actual) {
+                SearchHit hit = (SearchHit) actual;
+                return hit.getIndex().equals("index-" + authorizedUser) && hit.getId().equals("0");
+            }
+        };
+        final String pitId = openPointInTime(new String[]{"index-" + authorizedUser}, authorizedUser);
+        try {
+            Response submit = submitAsyncSearchWithPIT(pitId, "foo:bar", TimeValue.timeValueSeconds(10), authorizedUser);
+            assertOK(submit);
+            final Response resp = getAsyncSearch(extractResponseId(submit), authorizedUser);
+            assertOK(resp);
+            assertThat(getSearchHits(extractResponseId(resp), authorizedUser), arrayContainingInAnyOrder(hitMatcher));
+
+            String unauthorizedUser = randomValueOtherThan(authorizedUser, () -> randomFrom("user1", "user2"));
+            ResponseException exc = expectThrows(ResponseException.class,
+                () -> submitAsyncSearchWithPIT(pitId, "*:*", TimeValue.timeValueSeconds(10), unauthorizedUser));
+            assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403));
+            assertThat(exc.getMessage(), containsString("unauthorized"));
+
+        } finally {
+            closePointInTime(pitId, authorizedUser);
+        }
+    }
+
+    public void testRejectPointInTimeWithIndices() throws Exception {
+        String authorizedUser = randomFrom("user1", "user2");
+        final String pitId = openPointInTime(new String[]{"index-" + authorizedUser}, authorizedUser);
+        try {
+            final Request request = new Request("POST", "/_async_search");
+            setRunAsHeader(request, authorizedUser);
+            request.addParameter("wait_for_completion_timeout", "true");
+            request.addParameter("keep_on_completion", "true");
+            if (randomBoolean()) {
+                request.addParameter("index", "index-" + authorizedUser);
+            } else {
+                request.addParameter("index", "*");
+            }
+            final XContentBuilder requestBody = JsonXContent.contentBuilder()
+                .startObject()
+                .startObject("pit")
+                .field("id", pitId)
+                .field("keep_alive", "1m")
+                .endObject()
+                .endObject();
+            request.setJsonEntity(Strings.toString(requestBody));
+            final ResponseException exc = expectThrows(ResponseException.class, () -> client().performRequest(request));
+            assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(400));
+            assertThat(exc.getMessage(), containsString("[indices] cannot be used with point in time"));
+        } finally {
+            closePointInTime(pitId, authorizedUser);
+        }
+    }
+
+    public void testSharingPointInTime() throws Exception {
+        final Matcher<SearchHit> hitMatcher = new CustomMatcher<>("index") {
+            @Override
+            public boolean matches(Object actual) {
+                SearchHit hit = (SearchHit) actual;
+                return hit.getIndex().equals("index") && hit.getId().equals("0");
+            }
+        };
+        String firstUser = randomFrom("user1", "user2");
+        final String pitId = openPointInTime(new String[]{"index"}, firstUser);
+        try {
+            {
+                Response firstSubmit = submitAsyncSearchWithPIT(pitId, "foo:bar", TimeValue.timeValueSeconds(10), firstUser);
+                assertOK(firstSubmit);
+                final Response firstResp = getAsyncSearch(extractResponseId(firstSubmit), firstUser);
+                assertOK(firstResp);
+                final SearchHit[] firstHits = getSearchHits(extractResponseId(firstResp), firstUser);
+                assertThat(firstHits, arrayContainingInAnyOrder(hitMatcher));
+            }
+            {
+                String secondUser = randomValueOtherThan(firstUser, () -> randomFrom("user1", "user2"));
+                Response secondSubmit = submitAsyncSearchWithPIT(pitId, "foo:bar", TimeValue.timeValueSeconds(10), secondUser);
+                assertOK(secondSubmit);
+                final Response secondResp = getAsyncSearch(extractResponseId(secondSubmit), secondUser);
+                assertOK(secondResp);
+                final SearchHit[] secondHits = getSearchHits(extractResponseId(secondResp), secondUser);
+                assertThat(secondHits, arrayContainingInAnyOrder(hitMatcher));
+            }
+        } finally {
+            closePointInTime(pitId, firstUser);
+        }
+    }
+
+    public void testWithDLSPointInTime() throws Exception {
+        final String pitId = openPointInTime(new String[]{"index"}, "user1");
+        try {
+            Response userResp = submitAsyncSearchWithPIT(pitId, "*", TimeValue.timeValueSeconds(10), "user1");
+            assertOK(userResp);
+            assertThat(getSearchHits(extractResponseId(userResp), "user1"), arrayWithSize(3));
+
+            Response dlsResp = submitAsyncSearchWithPIT(pitId, "*", TimeValue.timeValueSeconds(10), "user_dls");
+            assertOK(dlsResp);
+            assertThat(getSearchHits(extractResponseId(dlsResp), "user_dls"), arrayContainingInAnyOrder(
+                new CustomMatcher<SearchHit>("\"index\" doc 1 matcher") {
+                    @Override
+                    public boolean matches(Object actual) {
+                        SearchHit hit = (SearchHit) actual;
+                        return "index".equals(hit.getIndex()) &&
+                            "1".equals(hit.getId()) &&
+                            hit.getSourceAsMap().isEmpty();
+                    }
+                },
+                new CustomMatcher<SearchHit>("\"index\" doc 2 matcher") {
+                    @Override
+                    public boolean matches(Object actual) {
+                        SearchHit hit = (SearchHit) actual;
+                        return "index".equals(hit.getIndex()) &&
+                            "2".equals(hit.getId()) &&
+                            "boo".equals(hit.getSourceAsMap().get("baz"));
+                    }
+                }));
+        } finally {
+            closePointInTime(pitId, "user1");
+        }
+    }
+
     static String extractResponseId(Response response) throws IOException {
         Map<String, Object> map = toMap(response);
         return (String) map.get("id");
@@ -219,4 +347,42 @@ public class AsyncSearchSecurityIT extends ESRestTestCase {
         builder.addHeader(RUN_AS_USER_HEADER, user);
         request.setOptions(builder);
     }
+
+    private String openPointInTime(String[] indexNames, String user) throws IOException {
+        final Request request = new Request("POST", "/_pit");
+        request.addParameter("index", String.join(",", indexNames));
+        setRunAsHeader(request, user);
+        request.addParameter("keep_alive", between(1, 5) + "m");
+        final Response response = client().performRequest(request);
+        assertOK(response);
+        return (String) toMap(response).get("id");
+    }
+
+    static Response submitAsyncSearchWithPIT(String pit, String query, TimeValue waitForCompletion, String user) throws IOException {
+        final Request request = new Request("POST", "/_async_search");
+        setRunAsHeader(request, user);
+        request.addParameter("wait_for_completion_timeout", waitForCompletion.toString());
+        request.addParameter("q", query);
+        request.addParameter("keep_on_completion", "true");
+        final XContentBuilder requestBody = JsonXContent.contentBuilder()
+            .startObject()
+                .startObject("pit")
+                    .field("id", pit)
+                    .field("keep_alive", "1m")
+                .endObject()
+            .endObject();
+        request.setJsonEntity(Strings.toString(requestBody));
+        return client().performRequest(request);
+    }
+
+    private void closePointInTime(String pitId, String user) throws IOException {
+        final Request request = new Request("DELETE", "/_pit");
+        setRunAsHeader(request, user);
+        final XContentBuilder requestBody = JsonXContent.contentBuilder()
+            .startObject()
+                .field("id", pitId)
+            .endObject();
+        request.setJsonEntity(Strings.toString(requestBody));
+        assertOK(client().performRequest(request));
+    }
 }

+ 10 - 9
x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

@@ -234,16 +234,17 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
     }
 
     public void testInvalidId() throws Exception {
-        SearchResponseIterator it =
-            assertBlockingIterator(indexName, numShards, new SearchSourceBuilder(), randomBoolean() ? 1 : 0, 2);
-        AsyncSearchResponse response = it.next();
-        ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncSearch("invalid"));
-        assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class));
-        assertThat(exc.getMessage(), containsString("invalid id"));
-        while (it.hasNext()) {
-            response = it.next();
+        try (SearchResponseIterator it =
+                 assertBlockingIterator(indexName, numShards, new SearchSourceBuilder(), randomBoolean() ? 1 : 0, 2)) {
+            AsyncSearchResponse response = it.next();
+            ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncSearch("invalid"));
+            assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class));
+            assertThat(exc.getMessage(), containsString("invalid id"));
+            while (it.hasNext()) {
+                response = it.next();
+            }
+            assertFalse(response.isRunning());
         }
-        assertFalse(response.isRunning());
     }
 
     public void testNoIndex() throws Exception {

+ 1 - 1
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java

@@ -49,7 +49,7 @@ public final class RestSubmitAsyncSearchAction extends BaseRestHandler {
         // pre_filter_shard_size and ccs_minimize_roundtrips get set to the search request although the REST spec don't list
         //them as supported. We rely on SubmitAsyncSearchRequest#validate to fail in case they are set.
         request.withContentOrSourceParamParserOrNull(parser ->
-            parseSearchRequest(submit.getSearchRequest(), request, parser, setSize));
+            parseSearchRequest(submit.getSearchRequest(), request, parser, client.getNamedWriteableRegistry(), setSize));
 
         if (request.hasParam("wait_for_completion_timeout")) {
             submit.setWaitForCompletionTimeout(request.paramAsTime("wait_for_completion_timeout", submit.getWaitForCompletionTimeout()));

+ 28 - 2
x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java

@@ -36,7 +36,11 @@ import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
 import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
 import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
+import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
+import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
 import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
+import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
+import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
 import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
 import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
 import org.elasticsearch.xpack.ilm.IndexLifecycle;
@@ -50,6 +54,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
 import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
@@ -207,7 +212,22 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
                                                             SearchSourceBuilder source,
                                                             int numFailures,
                                                             int progressStep) throws Exception {
-        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(source, indexName);
+        final String pitId;
+        final SubmitAsyncSearchRequest request;
+        if (randomBoolean()) {
+            OpenPointInTimeRequest openPIT = new OpenPointInTimeRequest(
+                new String[]{indexName},
+                OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS,
+                TimeValue.timeValueMinutes(between(1, 5)),
+                null,
+                null);
+            pitId = client().execute(OpenPointInTimeAction.INSTANCE, openPIT).actionGet().getSearchContextId();
+            source.pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder(pitId, TimeValue.timeValueMinutes(1)));
+            request = new SubmitAsyncSearchRequest(source);
+        } else {
+            pitId = null;
+            request = new SubmitAsyncSearchRequest(source, indexName);
+        }
         request.setBatchedReduceSize(progressStep);
         request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
         BlockingQueryBuilder.QueryLatch queryLatch = BlockingQueryBuilder.acquireQueryLatch(numFailures);
@@ -223,6 +243,7 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
         return new SearchResponseIterator() {
             private AsyncSearchResponse response = initial;
             private boolean isFirst = true;
+            private final AtomicBoolean closed = new AtomicBoolean();
 
             @Override
             public boolean hasNext() {
@@ -283,7 +304,12 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
 
             @Override
             public void close() {
-                queryLatch.close();
+                if (closed.compareAndSet(false, true)) {
+                    if (pitId != null) {
+                        client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet();
+                    }
+                    queryLatch.close();
+                }
             }
         };
     }

+ 3 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/RestOpenPointInTimeAction.java

@@ -28,7 +28,9 @@ public class RestOpenPointInTimeAction extends BaseRestHandler {
 
     @Override
     public List<Route> routes() {
-        return List.of(new Route(POST, "/{index}/_pit"));
+        return List.of(
+            new Route(POST, "/{index}/_pit"),
+            new Route(POST, "/_pit"));
     }
 
     @Override

+ 2 - 1
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestRollupSearchAction.java

@@ -37,7 +37,8 @@ public class RestRollupSearchAction extends BaseRestHandler {
     protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
         SearchRequest searchRequest = new SearchRequest();
         restRequest.withContentOrSourceParamParserOrNull(parser ->
-                RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, size -> searchRequest.source().size(size)));
+            RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser,
+                    client.getNamedWriteableRegistry(), size -> searchRequest.source().size(size)));
         RestSearchAction.checkRestTotalHits(restRequest, searchRequest);
         return channel -> client.execute(RollupSearchAction.INSTANCE, searchRequest, new RestToXContentListener<>(channel));
     }

+ 78 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/20-with-poin-in-time.yml

@@ -0,0 +1,78 @@
+---
+"Async search with point in time":
+  - skip:
+      version: " - 7.99.99"
+      reason: "point in time is introduced in 8.0"
+  - do:
+      indices.create:
+        index: test-1
+        body:
+          settings:
+            number_of_shards: "2"
+
+  - do:
+      indices.create:
+        index: test-2
+        body:
+          settings:
+            number_of_shards: "1"
+
+  - do:
+      indices.create:
+        index: test-3
+        body:
+          settings:
+            number_of_shards: "3"
+
+  - do:
+      index:
+        index:  test-2
+        body:   { max: 2 }
+
+  - do:
+      index:
+        index:  test-1
+        body:   { max: 1 }
+
+  - do:
+      index:
+        index:  test-3
+        body:   { max: 3 }
+
+  - do:
+      indices.refresh: {}
+
+  - do:
+      open_point_in_time:
+        index: test-*
+        keep_alive: 5m
+  - set: {id: point_in_time_id}
+
+  - do:
+      async_search.submit:
+        batched_reduce_size: 2
+        wait_for_completion_timeout: 10s
+        body:
+          query:
+            match_all: {}
+          aggs:
+            max:
+              max:
+                field: max
+          sort: max
+          pit:
+            id: "$point_in_time_id"
+            keep_alive: 1m
+
+  - is_false: id
+  - match:  { is_partial:                   false }
+  - length: { response.hits.hits:               3 }
+  - match:  { response.hits.hits.0._source.max: 1 }
+  - match:  { response.aggregations.max.value:  3.0 }
+
+  - do:
+      close_point_in_time:
+        body:
+          id: "$point_in_time_id"
+
+