Browse Source

do not scroll if max docs is less than scroll size (update/delete by query) (#81654)

This change allows to not open scroll while reindex/delete_by_query/update_by_query
if configured max_docs if less then or equal to the number of documents returned by the scroll batch.
Ievgen Degtiarenko 3 năm trước cách đây
mục cha
commit
11b52619c5

+ 16 - 0
docs/reference/docs/delete-by-query.asciidoc

@@ -423,6 +423,22 @@ POST my-index-000001/_delete_by_query?scroll_size=5000
 --------------------------------------------------
 // TEST[setup:my_index]
 
+Delete a document using a unique attribute:
+
+[source,console]
+--------------------------------------------------
+POST my-index-000001/_delete_by_query
+{
+  "query": {
+    "term": {
+      "user.id": "kimchy"
+    }
+  },
+  "max_docs": 1
+}
+--------------------------------------------------
+// TEST[setup:my_index]
+
 [discrete]
 [[docs-delete-by-query-manual-slice]]
 ===== Slice manually

+ 16 - 0
docs/reference/docs/update-by-query.asciidoc

@@ -367,6 +367,22 @@ POST my-index-000001/_update_by_query?scroll_size=100
 --------------------------------------------------
 // TEST[setup:my_index]
 
+Update a document using a unique attribute:
+
+[source,console]
+--------------------------------------------------
+POST my-index-000001/_update_by_query
+{
+  "query": {
+    "term": {
+      "user.id": "kimchy"
+    }
+  },
+  "max_docs": 1
+}
+--------------------------------------------------
+// TEST[setup:my_index]
+
 [[docs-update-by-query-api-source]]
 ===== Update the document source
 

+ 2 - 1
docs/reference/rest-api/common-parms.asciidoc

@@ -591,7 +591,8 @@ end::mappings[]
 tag::max_docs[]
 `max_docs`::
 (Optional, integer) Maximum number of documents to process. Defaults to all
-documents.
+documents. When set to a value less then or equal to `scroll_size` then a
+scroll will not be used to retrieve the results for the operation.
 end::max_docs[]
 
 tag::memory[]

+ 42 - 5
modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java

@@ -23,6 +23,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.bulk.Retry;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.client.ParentTaskAssigningClient;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -45,6 +46,7 @@ import org.elasticsearch.index.reindex.WorkerBulkByScrollTaskState;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.script.UpdateScript;
+import org.elasticsearch.search.Scroll;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.SortBuilder;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -177,20 +179,50 @@ public abstract class AbstractAsyncBulkByScrollAction<
         this.listener = listener;
         BackoffPolicy backoffPolicy = buildBackoffPolicy();
         bulkRetry = new Retry(BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool);
-        scrollSource = buildScrollableResultSource(backoffPolicy);
+        scrollSource = buildScrollableResultSource(
+            backoffPolicy,
+            prepareSearchRequest(mainRequest, needsSourceDocumentVersions, needsSourceDocumentSeqNoAndPrimaryTerm)
+        );
         scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
+    }
+
+    /**
+     * Prepares a search request to be used in a ScrollableHitSource.
+     * Preparation might set a sort order (if not set already) and disable scroll if max docs is small enough.
+     */
+    // Visible for testing
+    static <Request extends AbstractBulkByScrollRequest<Request>> SearchRequest prepareSearchRequest(
+        Request mainRequest,
+        boolean needsSourceDocumentVersions,
+        boolean needsSourceDocumentSeqNoAndPrimaryTerm
+    ) {
+        var preparedSearchRequest = new SearchRequest(mainRequest.getSearchRequest());
+
         /*
          * Default to sorting by doc. We can't do this in the request itself because it is normal to *add* to the sorts rather than replace
          * them and if we add _doc as the first sort by default then sorts will never work.... So we add it here, only if there isn't
          * another sort.
+         *
+         * This modifies the original request!
          */
-        final SearchSourceBuilder sourceBuilder = mainRequest.getSearchRequest().source();
+        final SearchSourceBuilder sourceBuilder = preparedSearchRequest.source();
         List<SortBuilder<?>> sorts = sourceBuilder.sorts();
         if (sorts == null || sorts.isEmpty()) {
             sourceBuilder.sort(fieldSort("_doc"));
         }
         sourceBuilder.version(needsSourceDocumentVersions);
         sourceBuilder.seqNoAndPrimaryTerm(needsSourceDocumentSeqNoAndPrimaryTerm);
+
+        /*
+         * Do not open scroll if max docs <= scroll size and not resuming on version conflicts
+         */
+        if (mainRequest.getMaxDocs() != MAX_DOCS_ALL_MATCHES
+            && mainRequest.getMaxDocs() <= preparedSearchRequest.source().size()
+            && mainRequest.isAbortOnVersionConflict()) {
+            preparedSearchRequest.scroll((Scroll) null);
+        }
+
+        return preparedSearchRequest;
     }
 
     /**
@@ -255,7 +287,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
         return bulkRequest;
     }
 
-    protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
+    protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy, SearchRequest searchRequest) {
         return new ClientScrollableHitSource(
             logger,
             backoffPolicy,
@@ -264,7 +296,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
             this::onScrollResponse,
             this::finishHim,
             searchClient,
-            mainRequest.getSearchRequest()
+            searchRequest
         );
     }
 
@@ -478,6 +510,12 @@ public abstract class AbstractAsyncBulkByScrollAction<
                 return;
             }
 
+            if (scrollSource.hasScroll() == false) {
+                // Index contains fewer matching docs than max_docs (found < max_docs <= scroll size)
+                refreshAndFinish(emptyList(), emptyList(), false);
+                return;
+            }
+
             onSuccess.run();
         } catch (Exception t) {
             finishHim(t);
@@ -499,7 +537,6 @@ public abstract class AbstractAsyncBulkByScrollAction<
         } else {
             onScrollResponse(asyncResponse);
         }
-
     }
 
     private void recordFailure(Failure failure, List<Failure> failures) {

+ 4 - 3
modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java

@@ -23,6 +23,7 @@ import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.ParentTaskAssigningClient;
 import org.elasticsearch.client.RestClient;
@@ -213,7 +214,7 @@ public class Reindexer {
         }
 
         @Override
-        protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
+        protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy, SearchRequest searchRequest) {
             if (mainRequest.getRemoteInfo() != null) {
                 RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
                 createdThreads = synchronizedList(new ArrayList<>());
@@ -228,10 +229,10 @@ public class Reindexer {
                     this::finishHim,
                     restClient,
                     remoteInfo.getQuery(),
-                    mainRequest.getSearchRequest()
+                    searchRequest
                 );
             }
-            return super.buildScrollableResultSource(backoffPolicy);
+            return super.buildScrollableResultSource(backoffPolicy, searchRequest);
         }
 
         @Override

+ 1 - 7
modules/reindex/src/main/java/org/elasticsearch/reindex/TransportDeleteByQueryAction.java

@@ -15,7 +15,6 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.client.ParentTaskAssigningClient;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.BulkByScrollTask;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
@@ -41,12 +40,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
         ScriptService scriptService,
         ClusterService clusterService
     ) {
-        super(
-            DeleteByQueryAction.NAME,
-            transportService,
-            actionFilters,
-            (Writeable.Reader<DeleteByQueryRequest>) DeleteByQueryRequest::new
-        );
+        super(DeleteByQueryAction.NAME, transportService, actionFilters, DeleteByQueryRequest::new);
         this.threadPool = threadPool;
         this.client = client;
         this.scriptService = scriptService;

+ 1 - 7
modules/reindex/src/main/java/org/elasticsearch/reindex/TransportUpdateByQueryAction.java

@@ -18,7 +18,6 @@ import org.elasticsearch.client.ParentTaskAssigningClient;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.IndexFieldMapper;
 import org.elasticsearch.index.mapper.RoutingFieldMapper;
@@ -53,12 +52,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
         ScriptService scriptService,
         ClusterService clusterService
     ) {
-        super(
-            UpdateByQueryAction.NAME,
-            transportService,
-            actionFilters,
-            (Writeable.Reader<UpdateByQueryRequest>) UpdateByQueryRequest::new
-        );
+        super(UpdateByQueryAction.NAME, transportService, actionFilters, UpdateByQueryRequest::new);
         this.threadPool = threadPool;
         this.client = client;
         this.scriptService = scriptService;

+ 94 - 7
modules/reindex/src/test/java/org/elasticsearch/reindex/AsyncBulkByScrollActionTests.java

@@ -117,11 +117,12 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.hasToString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 public class AsyncBulkByScrollActionTests extends ESTestCase {
     private MyMockClient client;
     private DummyAbstractBulkByScrollRequest testRequest;
-    private SearchRequest firstSearchRequest;
     private PlainActionFuture<BulkByScrollResponse> listener;
     private String scrollId;
     private ThreadPool threadPool;
@@ -140,8 +141,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
 
         threadPool = new TestThreadPool(getTestName());
         setupClient(threadPool);
-        firstSearchRequest = new SearchRequest();
-        testRequest = new DummyAbstractBulkByScrollRequest(firstSearchRequest);
+        testRequest = new DummyAbstractBulkByScrollRequest(new SearchRequest());
         listener = new PlainActionFuture<>();
         scrollId = null;
         taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
@@ -282,11 +282,16 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
 
     public void testBulkResponseSetsLotsOfStatus() throws Exception {
         testRequest.setAbortOnVersionConflict(false);
+
         int maxBatches = randomIntBetween(0, 100);
         long versionConflicts = 0;
         long created = 0;
         long updated = 0;
         long deleted = 0;
+
+        var action = new DummyAsyncBulkByScrollAction();
+        action.setScroll(scrollId());
+
         for (int batches = 0; batches < maxBatches; batches++) {
             BulkItemResponse[] responses = new BulkItemResponse[randomIntBetween(0, 100)];
             for (int i = 0; i < responses.length; i++) {
@@ -326,7 +331,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
                 final IndexResponse response = new IndexResponse(shardId, "id" + i, seqNo, primaryTerm, randomInt(), createdResponse);
                 responses[i] = BulkItemResponse.success(i, opType, response);
             }
-            assertExactlyOnce(onSuccess -> new DummyAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0), onSuccess));
+            assertExactlyOnce(onSuccess -> action.onBulkResponse(new BulkResponse(responses, 0), onSuccess));
             assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts());
             assertEquals(updated, testTask.getStatus().getUpdated());
             assertEquals(created, testTask.getStatus().getCreated());
@@ -335,6 +340,53 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         }
     }
 
+    public void testHandlesBulkWithNoScroll() {
+        // given a request that should not open scroll
+        var maxDocs = between(1, 100);
+        testRequest.setMaxDocs(maxDocs);
+        testRequest.getSearchRequest().source().size(100);
+
+        // when receiving bulk response
+        var responses = randomArray(0, maxDocs, BulkItemResponse[]::new, AsyncBulkByScrollActionTests::createBulkResponse);
+        new DummyAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0), () -> fail("should not be called"));
+
+        // then should refresh and finish
+        assertThat(listener.isDone(), equalTo(true));
+        var status = listener.actionGet().getStatus();
+        assertThat(status.getCreated() + status.getUpdated() + status.getDeleted(), equalTo((long) responses.length));
+    }
+
+    public void testHandlesBulkWhenMaxDocsIsReached() {
+        // given a request with max docs
+        var size = between(1, 10);
+        testRequest.setMaxDocs(size);
+        testRequest.getSearchRequest().source().size(100);
+
+        // when receiving bulk response with max docs
+        var responses = randomArray(size, size, BulkItemResponse[]::new, AsyncBulkByScrollActionTests::createBulkResponse);
+        new DummyAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0), () -> fail("should not be called"));
+
+        // then should refresh and finish
+        assertThat(listener.isDone(), equalTo(true));
+        var status = listener.actionGet().getStatus();
+        assertThat(status.getCreated() + status.getUpdated() + status.getDeleted(), equalTo((long) responses.length));
+    }
+
+    private static BulkItemResponse createBulkResponse() {
+        return BulkItemResponse.success(
+            0,
+            randomFrom(DocWriteRequest.OpType.values()),
+            new IndexResponse(
+                new ShardId(new Index("name", "uid"), 0),
+                "id",
+                randomInt(20),
+                randomIntBetween(1, 16),
+                randomIntBetween(0, Integer.MAX_VALUE),
+                true
+            )
+        );
+    }
+
     /**
      * Mimicks a ThreadPool rejecting execution of the task.
      */
@@ -498,6 +550,9 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
             }
         });
 
+        // Set the base for the scroll to wait - this is added to the figure we calculate below
+        testRequest.getSearchRequest().scroll(timeValueSeconds(10));
+
         DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction() {
             @Override
             protected RequestWrapper<?> buildRequest(Hit doc) {
@@ -506,9 +561,6 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         };
         action.setScroll(scrollId());
 
-        // Set the base for the scroll to wait - this is added to the figure we calculate below
-        firstSearchRequest.scroll(timeValueSeconds(10));
-
         // Set throttle to 1 request per second to make the math simpler
         worker.rethrottle(1f);
         action.start();
@@ -568,6 +620,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
 
         client.bulksToReject = client.bulksAttempts.get() + totalFailures;
         DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
+        action.setScroll(scrollId());
         BulkRequest request = new BulkRequest();
         for (int i = 0; i < size + 1; i++) {
             request.add(new IndexRequest("index").id("id" + i));
@@ -826,6 +879,39 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         expectThrows(IllegalArgumentException.class, () -> response.consumeHits(1));
     }
 
+    public void testEnableScrollByDefault() {
+        var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
+        assertThat(preparedSearchRequest.scroll(), notNullValue());
+    }
+
+    public void testEnableScrollWhenMaxDocsIsGreaterThenScrollSize() {
+        testRequest.setMaxDocs(between(101, 1000));
+        testRequest.getSearchRequest().source().size(100);
+
+        var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
+
+        assertThat(preparedSearchRequest.scroll(), notNullValue());
+    }
+
+    public void testDisableScrollWhenMaxDocsIsLessThenScrollSize() {
+        testRequest.setMaxDocs(between(1, 100));
+        testRequest.getSearchRequest().source().size(100);
+
+        var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
+
+        assertThat(preparedSearchRequest.scroll(), nullValue());
+    }
+
+    public void testEnableScrollWhenProceedOnVersionConflict() {
+        testRequest.setMaxDocs(between(1, 110));
+        testRequest.getSearchRequest().source().size(100);
+        testRequest.setAbortOnVersionConflict(false);
+
+        var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
+
+        assertThat(preparedSearchRequest.scroll(), notNullValue());
+    }
+
     /**
      * Simulate a scroll response by setting the scroll id and firing the onScrollResponse method.
      */
@@ -907,6 +993,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
     }
 
     private static class DummyAbstractBulkByScrollRequest extends AbstractBulkByScrollRequest<DummyAbstractBulkByScrollRequest> {
+
         DummyAbstractBulkByScrollRequest(SearchRequest searchRequest) {
             super(searchRequest, true);
         }

+ 4 - 0
server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java

@@ -152,6 +152,10 @@ public abstract class ScrollableHitSource {
         this.scrollId.set(scrollId);
     }
 
+    public final boolean hasScroll() {
+        return scrollId.get() != null;
+    }
+
     public interface AsyncResponse {
         /**
          * The response data made available.