Browse Source

Force merge rest api support wait_for_completion (#80463)

Force merge action is a very costly action. It may take several hours to run for big indices. But current force merge rest api do not support wait_for_completion parameter.
This adds support for the wait_for_completion parameter.
mushaoqiong 3 years ago
parent
commit
d467aae67e

+ 19 - 1
docs/reference/indices/forcemerge.asciidoc

@@ -52,11 +52,22 @@ performance.
 [[forcemerge-blocks]]
 [[forcemerge-blocks]]
 ===== Blocks during a force merge
 ===== Blocks during a force merge
 
 
-Calls to this API block until the merge is complete. If the client connection
+Calls to this API block until the merge is complete (unless request contains
+wait_for_completion=false, which is default true). If the client connection
 is lost before completion then the force merge process will continue in the
 is lost before completion then the force merge process will continue in the
 background. Any new requests to force merge the same indices will also block
 background. Any new requests to force merge the same indices will also block
 until the ongoing force merge is complete.
 until the ongoing force merge is complete.
 
 
+[[docs-forcemerge-task-api]]
+===== Running force merge asynchronously
+
+If the request contains `wait_for_completion=false`, {es}
+performs some preflight checks, launches the request, and returns a
+<<tasks,`task`>> you can use to get the status of the task. However, you can
+not cancel this task as the force merge task is not cancelable. {es}
+creates a record of this task as a document at `_tasks/<task_id>`. When you
+are done with a task, you should delete the task document so {es}
+can reclaim the space.
 
 
 [[forcemerge-multi-index]]
 [[forcemerge-multi-index]]
 ===== Force merging multiple indices
 ===== Force merging multiple indices
@@ -139,6 +150,13 @@ that does not contain those document deletions.
 You can't specify this parameter and `max_num_segments` in the same request.
 You can't specify this parameter and `max_num_segments` in the same request.
 --
 --
 
 
+`wait_for_completion`::
++
+--
+(Optional, Boolean)
+If `true`, the request blocks until the operation is complete.
+Defaults to `true`.
+--
 
 
 [[forcemerge-api-example]]
 [[forcemerge-api-example]]
 ==== {api-examples-title}
 ==== {api-examples-title}

+ 5 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/indices.forcemerge.json

@@ -63,6 +63,11 @@
       "only_expunge_deletes":{
       "only_expunge_deletes":{
         "type":"boolean",
         "type":"boolean",
         "description":"Specify whether the operation should only expunge deleted documents"
         "description":"Specify whether the operation should only expunge deleted documents"
+      },
+      "wait_for_completion":{
+        "type":"boolean",
+        "default":true,
+        "description":"Should the request wait until the force merge is completed."
       }
       }
     }
     }
   }
   }

+ 18 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml

@@ -29,3 +29,21 @@
   - match: { status: 400 }
   - match: { status: 400 }
   - match: { error.type: action_request_validation_exception }
   - match: { error.type: action_request_validation_exception }
   - match: { error.reason: "Validation Failed: 1: cannot set only_expunge_deletes and max_num_segments at the same time, those two parameters are mutually exclusive;" }
   - match: { error.reason: "Validation Failed: 1: cannot set only_expunge_deletes and max_num_segments at the same time, those two parameters are mutually exclusive;" }
+
+---
+"Force merge with wait_for_completion parameter":
+  - skip:
+      version: " - 8.0.99"
+      reason: wait_for_completion is introduced since 8.1
+
+  - do:
+      indices.create:
+        index: test_index
+
+  - do:
+      indices.forcemerge:
+        max_num_segments: 1
+        wait_for_completion: false
+  - match: { task: '/^\S+:\d+$/' }
+
+

+ 17 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeRequest.java

@@ -43,6 +43,10 @@ public class ForceMergeRequest extends BroadcastRequest<ForceMergeRequest> {
     private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
     private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
     private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
     private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
     private boolean flush = Defaults.FLUSH;
     private boolean flush = Defaults.FLUSH;
+    /**
+     * Should this task store its result?
+     */
+    private boolean shouldStoreResult;
 
 
     private static final Version FORCE_MERGE_UUID_SIMPLE_VERSION = Version.V_8_0_0;
     private static final Version FORCE_MERGE_UUID_SIMPLE_VERSION = Version.V_8_0_0;
 
 
@@ -131,6 +135,19 @@ public class ForceMergeRequest extends BroadcastRequest<ForceMergeRequest> {
         return this;
         return this;
     }
     }
 
 
+    /**
+     * Should this task store its result after it has finished?
+     */
+    public ForceMergeRequest setShouldStoreResult(boolean shouldStoreResult) {
+        this.shouldStoreResult = shouldStoreResult;
+        return this;
+    }
+
+    @Override
+    public boolean getShouldStoreResult() {
+        return shouldStoreResult;
+    }
+
     @Override
     @Override
     public String getDescription() {
     public String getDescription() {
         return "Force-merge indices "
         return "Force-merge indices "

+ 35 - 1
server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestForceMergeAction.java

@@ -8,13 +8,20 @@
 
 
 package org.elasticsearch.rest.action.admin.indices;
 package org.elasticsearch.rest.action.admin.indices;
 
 
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.BytesRestResponse;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.tasks.LoggingTaskListener;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.xcontent.XContentBuilder;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.List;
 import java.util.List;
@@ -40,7 +47,34 @@ public class RestForceMergeAction extends BaseRestHandler {
         mergeRequest.maxNumSegments(request.paramAsInt("max_num_segments", mergeRequest.maxNumSegments()));
         mergeRequest.maxNumSegments(request.paramAsInt("max_num_segments", mergeRequest.maxNumSegments()));
         mergeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", mergeRequest.onlyExpungeDeletes()));
         mergeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", mergeRequest.onlyExpungeDeletes()));
         mergeRequest.flush(request.paramAsBoolean("flush", mergeRequest.flush()));
         mergeRequest.flush(request.paramAsBoolean("flush", mergeRequest.flush()));
-        return channel -> client.admin().indices().forceMerge(mergeRequest, new RestToXContentListener<>(channel));
+        if (request.paramAsBoolean("wait_for_completion", true)) {
+            return channel -> client.admin().indices().forceMerge(mergeRequest, new RestToXContentListener<>(channel));
+        } else {
+            mergeRequest.setShouldStoreResult(true);
+            /*
+             * Let's try and validate before forking so the user gets some error. The
+             * task can't totally validate until it starts but this is better than
+             * nothing.
+             */
+            ActionRequestValidationException validationException = mergeRequest.validate();
+            if (validationException != null) {
+                throw validationException;
+            }
+            return sendTask(
+                client.getLocalNodeId(),
+                client.executeLocally(ForceMergeAction.INSTANCE, mergeRequest, LoggingTaskListener.instance())
+            );
+        }
     }
     }
 
 
+    private RestChannelConsumer sendTask(String localNodeId, Task task) {
+        return channel -> {
+            try (XContentBuilder builder = channel.newBuilder()) {
+                builder.startObject();
+                builder.field("task", localNodeId + ":" + task.getId());
+                builder.endObject();
+                channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
+            }
+        };
+    }
 }
 }