Selaa lähdekoodia

Add submitDeleteByQueryTask method to RestHighLevelClient (#46833)

The HLRC has a method for reindex, that allows to trigger an async reindex by running RestHighLevelClient.submitReindexTask and RestHighLevelClient.reindex. The delete by query however only has an RestHighLevelClient.deleteByQuery method (and its async counterpart), but no RestHighLevelClient.submitDeleteByQueryTask. So add RestHighLevelClient.submitDeleteByQueryTask

Closes #46395
maidoo 6 vuotta sitten
vanhempi
commit
2733e91fe1

+ 34 - 24
client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

@@ -545,6 +545,10 @@ final class RequestConverters {
         return prepareReindexRequest(reindexRequest, false);
         return prepareReindexRequest(reindexRequest, false);
     }
     }
 
 
+    static Request submitDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
+        return prepareDeleteByQueryRequest(deleteByQueryRequest, false);
+    }
+
     private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException {
     private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException {
         String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
         String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
         Request request = new Request(HttpPost.METHOD_NAME, endpoint);
         Request request = new Request(HttpPost.METHOD_NAME, endpoint);
@@ -564,6 +568,35 @@ final class RequestConverters {
         return request;
         return request;
     }
     }
 
 
+    private static Request prepareDeleteByQueryRequest(DeleteByQueryRequest deleteByQueryRequest,
+                                                       boolean waitForCompletion) throws IOException {
+        String endpoint = endpoint(deleteByQueryRequest.indices(), "_delete_by_query");
+        Request request = new Request(HttpPost.METHOD_NAME, endpoint);
+        Params params = new Params()
+            .withRouting(deleteByQueryRequest.getRouting())
+            .withRefresh(deleteByQueryRequest.isRefresh())
+            .withTimeout(deleteByQueryRequest.getTimeout())
+            .withWaitForActiveShards(deleteByQueryRequest.getWaitForActiveShards())
+            .withRequestsPerSecond(deleteByQueryRequest.getRequestsPerSecond())
+            .withIndicesOptions(deleteByQueryRequest.indicesOptions())
+            .withWaitForCompletion(waitForCompletion);
+        if (deleteByQueryRequest.isAbortOnVersionConflict() == false) {
+            params.putParam("conflicts", "proceed");
+        }
+        if (deleteByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) {
+            params.putParam("scroll_size", Integer.toString(deleteByQueryRequest.getBatchSize()));
+        }
+        if (deleteByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
+            params.putParam("scroll", deleteByQueryRequest.getScrollTime());
+        }
+        if (deleteByQueryRequest.getMaxDocs() > 0) {
+            params.putParam("max_docs", Integer.toString(deleteByQueryRequest.getMaxDocs()));
+        }
+        request.addParameters(params.asMap());
+        request.setEntity(createEntity(deleteByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
+        return request;
+    }
+
     static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
     static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
         String endpoint = endpoint(updateByQueryRequest.indices(), "_update_by_query");
         String endpoint = endpoint(updateByQueryRequest.indices(), "_update_by_query");
         Request request = new Request(HttpPost.METHOD_NAME, endpoint);
         Request request = new Request(HttpPost.METHOD_NAME, endpoint);
@@ -593,30 +626,7 @@ final class RequestConverters {
     }
     }
 
 
     static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
     static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
-        String endpoint = endpoint(deleteByQueryRequest.indices(), "_delete_by_query");
-        Request request = new Request(HttpPost.METHOD_NAME, endpoint);
-        Params params = new Params()
-            .withRouting(deleteByQueryRequest.getRouting())
-            .withRefresh(deleteByQueryRequest.isRefresh())
-            .withTimeout(deleteByQueryRequest.getTimeout())
-            .withWaitForActiveShards(deleteByQueryRequest.getWaitForActiveShards())
-            .withRequestsPerSecond(deleteByQueryRequest.getRequestsPerSecond())
-            .withIndicesOptions(deleteByQueryRequest.indicesOptions());
-        if (deleteByQueryRequest.isAbortOnVersionConflict() == false) {
-            params.putParam("conflicts", "proceed");
-        }
-        if (deleteByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) {
-            params.putParam("scroll_size", Integer.toString(deleteByQueryRequest.getBatchSize()));
-        }
-        if (deleteByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
-            params.putParam("scroll", deleteByQueryRequest.getScrollTime());
-        }
-        if (deleteByQueryRequest.getMaxDocs() > 0) {
-            params.putParam("max_docs", Integer.toString(deleteByQueryRequest.getMaxDocs()));
-        }
-        request.addParameters(params.asMap());
-        request.setEntity(createEntity(deleteByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
-        return request;
+        return prepareDeleteByQueryRequest(deleteByQueryRequest, true);
     }
     }
 
 
     static Request rethrottleReindex(RethrottleRequest rethrottleRequest) {
     static Request rethrottleReindex(RethrottleRequest rethrottleRequest) {

+ 15 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

@@ -590,6 +590,21 @@ public class RestHighLevelClient implements Closeable {
         );
         );
     }
     }
 
 
+    /**
+     * Submits a delete by query task
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
+     *      Delete By Query API on elastic.co</a>
+     * @param deleteByQueryRequest the request
+     * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return the submission response
+     */
+    public final TaskSubmissionResponse submitDeleteByQueryTask(DeleteByQueryRequest deleteByQueryRequest,
+                                                                RequestOptions options) throws IOException {
+        return performRequestAndParseEntity(
+            deleteByQueryRequest, RequestConverters::submitDeleteByQuery, options, TaskSubmissionResponse::fromXContent, emptySet()
+        );
+    }
+
     /**
     /**
      * Asynchronously executes a delete by query request.
      * Asynchronously executes a delete by query request.
      * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
      * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">

+ 41 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java

@@ -436,6 +436,47 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
         }
         }
     }
     }
 
 
+    public void testDeleteByQueryTask() throws Exception {
+        final String sourceIndex = "source456";
+        {
+            // Prepare
+            Settings settings = Settings.builder()
+                .put("number_of_shards", 1)
+                .put("number_of_replicas", 0)
+                .build();
+            createIndex(sourceIndex, settings);
+            assertEquals(
+                RestStatus.OK,
+                highLevelClient().bulk(
+                    new BulkRequest()
+                        .add(new IndexRequest(sourceIndex).id("1")
+                            .source(Collections.singletonMap("foo", 1), XContentType.JSON))
+                        .add(new IndexRequest(sourceIndex).id("2")
+                            .source(Collections.singletonMap("foo", 2), XContentType.JSON))
+                        .add(new IndexRequest(sourceIndex).id("3")
+                            .source(Collections.singletonMap("foo", 3), XContentType.JSON))
+                        .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
+                    RequestOptions.DEFAULT
+                ).status()
+            );
+        }
+        {
+            // tag::submit-delete_by_query-task
+            DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
+            deleteByQueryRequest.indices(sourceIndex);
+            deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
+            deleteByQueryRequest.setRefresh(true);
+
+            TaskSubmissionResponse deleteByQuerySubmission = highLevelClient()
+                .submitDeleteByQueryTask(deleteByQueryRequest, RequestOptions.DEFAULT);
+
+            String taskId = deleteByQuerySubmission.getTask();
+            // end::submit-delete_by_query-task
+
+            assertBusy(checkCompletionStatus(client(), taskId));
+        }
+    }
+
     private static TaskId findTaskToRethrottle(String actionName) throws IOException {
     private static TaskId findTaskToRethrottle(String actionName) throws IOException {
         long start = System.nanoTime();
         long start = System.nanoTime();
         ListTasksRequest request = new ListTasksRequest();
         ListTasksRequest request = new ListTasksRequest();

+ 1 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

@@ -538,6 +538,7 @@ public class RequestConvertersTests extends ESTestCase {
         }
         }
         setRandomIndicesOptions(deleteByQueryRequest::setIndicesOptions, deleteByQueryRequest::indicesOptions, expectedParams);
         setRandomIndicesOptions(deleteByQueryRequest::setIndicesOptions, deleteByQueryRequest::indicesOptions, expectedParams);
         setRandomTimeout(deleteByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
         setRandomTimeout(deleteByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
+        expectedParams.put("wait_for_completion", Boolean.TRUE.toString());
         Request request = RequestConverters.deleteByQuery(deleteByQueryRequest);
         Request request = RequestConverters.deleteByQuery(deleteByQueryRequest);
         StringJoiner joiner = new StringJoiner("/", "/", "");
         StringJoiner joiner = new StringJoiner("/", "/", "");
         joiner.add(String.join(",", deleteByQueryRequest.indices()));
         joiner.add(String.join(",", deleteByQueryRequest.indices()));