Browse Source

Add scroll parameter to _reindex API (#28041)

Be able to change scroll timeout in _reindex API (by default: 5m)
Yu 7 years ago
parent
commit
228f7ffcdf

+ 17 - 1
core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java

@@ -28,6 +28,7 @@ import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.search.Scroll;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
@@ -42,7 +43,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
 public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScrollRequest<Self>> extends ActionRequest {
 
     public static final int SIZE_ALL_MATCHES = -1;
-    private static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5);
+    static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5);
     static final int DEFAULT_SCROLL_SIZE = 1000;
 
     public static final int AUTO_SLICES = 0;
@@ -341,6 +342,21 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
         return shouldStoreResult;
     }
 
+    /**
+     * Set scroll timeout for {@link SearchRequest}
+     */
+    public Self setScroll(TimeValue keepAlive) {
+        searchRequest.scroll(new Scroll(keepAlive));
+        return self();
+    }
+
+    /**
+     * Get scroll timeout
+     */
+    public TimeValue getScrollTime() {
+        return searchRequest.scroll().keepAlive();
+    }
+
     /**
      * The number of slices this task should be divided into. Defaults to 1 meaning the task isn't sliced into subtasks.
      */

+ 5 - 2
docs/reference/docs/delete-by-query.asciidoc

@@ -142,7 +142,8 @@ POST twitter/_delete_by_query?scroll_size=5000
 === URL Parameters
 
 In addition to the standard parameters like `pretty`, the Delete By Query API
-also supports `refresh`, `wait_for_completion`, `wait_for_active_shards`, and `timeout`.
+also supports `refresh`, `wait_for_completion`, `wait_for_active_shards`, `timeout`
+and `scroll`.
 
 Sending the `refresh` will refresh all shards involved in the delete by query
 once the request completes. This is different than the Delete API's `refresh`
@@ -161,7 +162,9 @@ Elasticsearch can reclaim the space it uses.
 before proceeding with the request. See <<index-wait-for-active-shards,here>>
 for details. `timeout` controls how long each write request waits for unavailable
 shards to become available. Both work exactly how they work in the
-<<docs-bulk,Bulk API>>.
+<<docs-bulk,Bulk API>>. As `_delete_by_query` uses scroll search, you can also specify
+the `scroll` parameter to control how long it keeps the "search context" alive,
+eg `?scroll=10m`, by default it's 5 minutes.
 
 `requests_per_second` can be set to any positive decimal number (`1.4`, `6`,
 `1000`, etc) and throttles rate at which `_delete_by_query` issues batches of

+ 5 - 3
docs/reference/docs/reindex.asciidoc

@@ -512,8 +512,8 @@ POST _reindex
 === URL Parameters
 
 In addition to the standard parameters like `pretty`, the Reindex API also
-supports `refresh`, `wait_for_completion`, `wait_for_active_shards`, `timeout`, and
-`requests_per_second`.
+supports `refresh`, `wait_for_completion`, `wait_for_active_shards`, `timeout`,
+`scroll` and `requests_per_second`.
 
 Sending the `refresh` url parameter will cause all indexes to which the request
 wrote to be refreshed. This is different than the Index API's `refresh`
@@ -531,7 +531,9 @@ Elasticsearch can reclaim the space it uses.
 before proceeding with the reindexing. See <<index-wait-for-active-shards,here>>
 for details. `timeout` controls how long each write request waits for unavailable
 shards to become available. Both work exactly how they work in the
-<<docs-bulk,Bulk API>>.
+<<docs-bulk,Bulk API>>. As `_reindex` uses scroll search, you can also specify
+the `scroll` parameter to control how long it keeps the "search context" alive,
+eg `?scroll=10m`, by default it's 5 minutes.
 
 `requests_per_second` can be set to any positive decimal number (`1.4`, `6`,
 `1000`, etc) and throttles rate at which reindex issues batches of index

+ 5 - 2
docs/reference/docs/update-by-query.asciidoc

@@ -200,7 +200,8 @@ POST twitter/_update_by_query?pipeline=set-foo
 === URL Parameters
 
 In addition to the standard parameters like `pretty`, the Update By Query API
-also supports `refresh`, `wait_for_completion`, `wait_for_active_shards`, and `timeout`.
+also supports `refresh`, `wait_for_completion`, `wait_for_active_shards`, `timeout`
+and `scroll`.
 
 Sending the `refresh` will update all shards in the index being updated when
 the request completes. This is different than the Index API's `refresh`
@@ -218,7 +219,9 @@ Elasticsearch can reclaim the space it uses.
 before proceeding with the request. See <<index-wait-for-active-shards,here>>
 for details. `timeout` controls how long each write request waits for unavailable
 shards to become available. Both work exactly how they work in the
-<<docs-bulk,Bulk API>>.
+<<docs-bulk,Bulk API>>. As `_update_by_query` uses scroll search, you can also specify
+the `scroll` parameter to control how long it keeps the "search context" alive,
+eg `?scroll=10m`, by default it's 5 minutes.
 
 `requests_per_second` can be set to any positive decimal number (`1.4`, `6`,
 `1000`, etc) and throttles rate at which `_update_by_query` issues batches of

+ 3 - 0
modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java

@@ -119,6 +119,9 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
         try (XContentParser parser = request.contentParser()) {
             PARSER.parse(parser, internal, null);
         }
+        if (request.hasParam("scroll")) {
+            internal.setScroll(parseTimeValue(request.param("scroll"), "scroll"));
+        }
         return internal;
     }
 

+ 20 - 0
modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java

@@ -21,10 +21,12 @@ package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.test.ESTestCase;
@@ -150,6 +152,24 @@ public class RestReindexActionTests extends ESTestCase {
         assertEquals("_reindex doesn't support [pipeline] as a query parmaeter. Specify it in the [dest] object instead.", e.getMessage());
     }
 
+    public void testSetScrollTimeout() throws IOException {
+        {
+            RestReindexAction action = new RestReindexAction(Settings.EMPTY, mock(RestController.class));
+            FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry());
+            requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON);
+            ReindexRequest request = action.buildRequest(requestBuilder.build());
+            assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT, request.getScrollTime());
+        }
+        {
+            RestReindexAction action = new RestReindexAction(Settings.EMPTY, mock(RestController.class));
+            FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry());
+            requestBuilder.withParams(singletonMap("scroll", "10m"));
+            requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON);
+            ReindexRequest request = action.buildRequest(requestBuilder.build());
+            assertEquals("10m", request.getScrollTime().toString());
+        }
+    }
+
     private RemoteInfo buildRemoteInfoHostTestCase(String hostInRest) throws IOException {
         Map<String, Object> remote = new HashMap<>();
         remote.put("host", hostInRest);