浏览代码

Make enrich policy execution cancelable (#77188)

The policy cancellation implementation is best-effort.
Prior to each transport action call the policy runner
checks whether the corresponding task has been cancelled.
If so no further action is performed, otherwise
execution is continued as planned.

The policy execution tasks is also made the parent task
of transport action requests that are being executed
by the policy runner. This will allow cancellation when
certain transport actions are being executed (e.g. reindex).
Also it should provide better insight which other tasks
are related to a policy execution task.

Additionally, to this change a `step` field is added to
the enrich policy status. This field will contain
the name of the currently executing transport action
request. This will help, giving better insight, what
a policy execution is doing.

Relates #48988
Martijn van Groningen 4 年之前
父节点
当前提交
9e0caac5f4

+ 21 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyStatus.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.enrich.action;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -22,20 +23,30 @@ public class ExecuteEnrichPolicyStatus implements Task.Status {
         public static final String RUNNING = "RUNNING";
         public static final String RUNNING = "RUNNING";
         public static final String COMPLETE = "COMPLETE";
         public static final String COMPLETE = "COMPLETE";
         public static final String FAILED = "FAILED";
         public static final String FAILED = "FAILED";
+        public static final String CANCELLED = "CANCELLED";
     }
     }
 
 
     public static final String NAME = "enrich-policy-execution";
     public static final String NAME = "enrich-policy-execution";
 
 
     private static final String PHASE_FIELD = "phase";
     private static final String PHASE_FIELD = "phase";
+    private static final String STEP_FIELD = "step";
 
 
     private final String phase;
     private final String phase;
+    private final String step;
 
 
     public ExecuteEnrichPolicyStatus(String phase) {
     public ExecuteEnrichPolicyStatus(String phase) {
         this.phase = phase;
         this.phase = phase;
+        this.step = null;
+    }
+
+    public ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus status, String step) {
+        this.phase = status.phase;
+        this.step = step;
     }
     }
 
 
     public ExecuteEnrichPolicyStatus(StreamInput in) throws IOException {
     public ExecuteEnrichPolicyStatus(StreamInput in) throws IOException {
         this.phase = in.readString();
         this.phase = in.readString();
+        this.step = in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readOptionalString() : null;
     }
     }
 
 
     public String getPhase() {
     public String getPhase() {
@@ -46,6 +57,10 @@ public class ExecuteEnrichPolicyStatus implements Task.Status {
         return PolicyPhases.COMPLETE.equals(phase);
         return PolicyPhases.COMPLETE.equals(phase);
     }
     }
 
 
+    public String getStep() {
+        return step;
+    }
+
     @Override
     @Override
     public String getWriteableName() {
     public String getWriteableName() {
         return NAME;
         return NAME;
@@ -54,6 +69,9 @@ public class ExecuteEnrichPolicyStatus implements Task.Status {
     @Override
     @Override
     public void writeTo(StreamOutput out) throws IOException {
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(phase);
         out.writeString(phase);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeOptionalString(step);
+        }
     }
     }
 
 
     @Override
     @Override
@@ -61,6 +79,9 @@ public class ExecuteEnrichPolicyStatus implements Task.Status {
         builder.startObject();
         builder.startObject();
         {
         {
             builder.field(PHASE_FIELD, phase);
             builder.field(PHASE_FIELD, phase);
+            if (step != null) {
+                builder.field(STEP_FIELD, step);
+            }
         }
         }
         builder.endObject();
         builder.endObject();
         return builder;
         return builder;

+ 32 - 1
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

@@ -11,6 +11,9 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
@@ -27,6 +30,7 @@ import org.elasticsearch.action.admin.indices.segments.ShardSegments;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.client.FilterClient;
 import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
@@ -48,6 +52,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.ReindexRequest;
 import org.elasticsearch.index.reindex.ReindexRequest;
 import org.elasticsearch.index.reindex.ScrollableHitSource;
 import org.elasticsearch.index.reindex.ScrollableHitSource;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
 import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
 import org.elasticsearch.xpack.enrich.action.EnrichReindexAction;
 import org.elasticsearch.xpack.enrich.action.EnrichReindexAction;
@@ -104,7 +109,7 @@ public class EnrichPolicyRunner implements Runnable {
         this.task = task;
         this.task = task;
         this.listener = listener;
         this.listener = listener;
         this.clusterService = clusterService;
         this.clusterService = clusterService;
-        this.client = client;
+        this.client = wrapClient(client, policyName, task, clusterService);
         this.indexNameExpressionResolver = indexNameExpressionResolver;
         this.indexNameExpressionResolver = indexNameExpressionResolver;
         this.nowSupplier = nowSupplier;
         this.nowSupplier = nowSupplier;
         this.fetchSize = fetchSize;
         this.fetchSize = fetchSize;
@@ -587,4 +592,30 @@ public class EnrichPolicyRunner implements Runnable {
     private Client enrichOriginClient() {
     private Client enrichOriginClient() {
         return new OriginSettingClient(client, ENRICH_ORIGIN);
         return new OriginSettingClient(client, ENRICH_ORIGIN);
     }
     }
+
+    private static Client wrapClient(Client in, String policyName, ExecuteEnrichPolicyTask task, ClusterService clusterService) {
+        // Filter client in order to:
+        // 1) Check on transport action call that policy runner does whether the task has been cancelled
+        // 2) Set the enrich policy task as parent task, so if other API calls (e.g. reindex) are cancellable then
+        // the corresponding tasks of these API calls get cancelled as well.
+        return new FilterClient(in) {
+
+            @Override
+            protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
+                ActionType<Response> action,
+                Request request,
+                ActionListener<Response> listener
+            ) {
+                String requestStep = request.getClass().getSimpleName();
+                task.setStep(requestStep);
+                if (task.isCancelled()) {
+                    String message = "cancelled policy execution [" + policyName + "], status [" + Strings.toString(task.getStatus()) + "]";
+                    listener.onFailure(new TaskCancelledException(message));
+                    return;
+                }
+                request.setParentTask(clusterService.localNode().getId(), task.getId());
+                super.doExecute(action, request, listener);
+            }
+        };
+    }
 }
 }

+ 11 - 2
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java

@@ -6,13 +6,13 @@
  */
  */
 package org.elasticsearch.xpack.enrich;
 package org.elasticsearch.xpack.enrich;
 
 
-import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
 import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
 
 
 import java.util.Map;
 import java.util.Map;
 
 
-public class ExecuteEnrichPolicyTask extends Task {
+public class ExecuteEnrichPolicyTask extends CancellableTask {
 
 
     private volatile ExecuteEnrichPolicyStatus status;
     private volatile ExecuteEnrichPolicyStatus status;
 
 
@@ -35,4 +35,13 @@ public class ExecuteEnrichPolicyTask extends Task {
     public void setStatus(ExecuteEnrichPolicyStatus status) {
     public void setStatus(ExecuteEnrichPolicyStatus status) {
         this.status = status;
         this.status = status;
     }
     }
+
+    @Override
+    protected void onCancelled() {
+        setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.CANCELLED));
+    }
+
+    public void setStep(String requestStep) {
+        setStatus(new ExecuteEnrichPolicyStatus(status, requestStep));
+    }
 }
 }

+ 8 - 4
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskAwareRequest;
 import org.elasticsearch.tasks.TaskAwareRequest;
+import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
 import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
@@ -115,10 +116,13 @@ public class InternalExecutePolicyAction extends ActionType<Response> {
                 if (request.isWaitForCompletion()) {
                 if (request.isWaitForCompletion()) {
                     listener = ActionListener.wrap(result -> actionListener.onResponse(new Response(result)), actionListener::onFailure);
                     listener = ActionListener.wrap(result -> actionListener.onResponse(new Response(result)), actionListener::onFailure);
                 } else {
                 } else {
-                    listener = ActionListener.wrap(
-                        result -> LOGGER.debug("successfully executed policy [{}]", request.getName()),
-                        e -> LOGGER.error("failed to execute policy [" + request.getName() + "]", e)
-                    );
+                    listener = ActionListener.wrap(result -> LOGGER.debug("successfully executed policy [{}]", request.getName()), e -> {
+                        if (e instanceof TaskCancelledException) {
+                            LOGGER.info(e.getMessage());
+                        } else {
+                            LOGGER.error("failed to execute policy [" + request.getName() + "]", e);
+                        }
+                    });
                 }
                 }
                 policyExecutor.runPolicyLocally(task, request.getName(), ActionListener.wrap(result -> {
                 policyExecutor.runPolicyLocally(task, request.getName(), ActionListener.wrap(result -> {
                     taskManager.unregister(task);
                     taskManager.unregister(task);

+ 90 - 1
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java

@@ -8,21 +8,33 @@ package org.elasticsearch.xpack.enrich;
 
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.LatchedActionListener;
 import org.elasticsearch.action.LatchedActionListener;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
+import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
+import org.elasticsearch.action.admin.indices.get.GetIndexAction;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
 import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
+import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
 import org.elasticsearch.action.admin.indices.segments.IndexSegments;
 import org.elasticsearch.action.admin.indices.segments.IndexSegments;
 import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
 import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
 import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
 import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
+import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
 import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
 import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
 import org.elasticsearch.action.admin.indices.segments.ShardSegments;
 import org.elasticsearch.action.admin.indices.segments.ShardSegments;
+import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.FilterClient;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
@@ -49,6 +61,7 @@ import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
 import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
+import org.elasticsearch.xpack.enrich.action.EnrichReindexAction;
 import org.junit.AfterClass;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
 
 
@@ -1966,7 +1979,83 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         ensureEnrichIndexIsReadOnly(createdEnrichIndex);
         ensureEnrichIndexIsReadOnly(createdEnrichIndex);
     }
     }
 
 
+    public void testRunnerCancel() throws Exception {
+        final String sourceIndex = "source-index";
+        IndexResponse indexRequest = client().index(
+            new IndexRequest().index(sourceIndex)
+                .id("id")
+                .source(
+                    "{"
+                        + "\"field1\":\"value1\","
+                        + "\"field2\":2,"
+                        + "\"field3\":\"ignored\","
+                        + "\"field4\":\"ignored\","
+                        + "\"field5\":\"value5\""
+                        + "}",
+                    XContentType.JSON
+                )
+                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+        ).actionGet();
+        assertEquals(RestStatus.CREATED, indexRequest.status());
+
+        List<String> enrichFields = List.of("field2", "field5");
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields);
+        String policyName = "test1";
+
+        final long createTime = randomNonNegativeLong();
+        final AtomicReference<Exception> exception = new AtomicReference<>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
+
+        ActionType<?> randomActionType = randomFrom(
+            EnrichReindexAction.INSTANCE,
+            GetIndexAction.INSTANCE,
+            CreateIndexAction.INSTANCE,
+            ForceMergeAction.INSTANCE,
+            RefreshAction.INSTANCE,
+            IndicesSegmentsAction.INSTANCE,
+            UpdateSettingsAction.INSTANCE,
+            ClusterHealthAction.INSTANCE
+        );
+        logger.info("Selected [{}] to perform cancel", randomActionType.name());
+        Client client = new FilterClient(client()) {
+
+            @Override
+            protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
+                ActionType<Response> action,
+                Request request,
+                ActionListener<Response> listener
+            ) {
+                if (action.equals(randomActionType)) {
+                    testTaskManager.getCancellableTasks()
+                        .values()
+                        .stream()
+                        .filter(cancellableTask -> cancellableTask instanceof ExecuteEnrichPolicyTask)
+                        .forEach(task -> testTaskManager.cancel(task, "", () -> {}));
+                }
+                super.doExecute(action, request, listener);
+            }
+        };
+
+        EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(client, policyName, policy, listener, createTime);
+        logger.info("Starting policy run");
+        enrichPolicyRunner.run();
+        latch.await();
+        assertThat(exception.get(), notNullValue());
+        assertThat(exception.get().getMessage(), containsString("cancelled policy execution [test1], status ["));
+    }
+
+    private EnrichPolicyRunner createPolicyRunner(
+        String policyName,
+        EnrichPolicy policy,
+        ActionListener<ExecuteEnrichPolicyStatus> listener,
+        Long createTime
+    ) {
+        return createPolicyRunner(client(), policyName, policy, listener, createTime);
+    }
+
     private EnrichPolicyRunner createPolicyRunner(
     private EnrichPolicyRunner createPolicyRunner(
+        Client client,
         String policyName,
         String policyName,
         EnrichPolicy policy,
         EnrichPolicy policy,
         ActionListener<ExecuteEnrichPolicyStatus> listener,
         ActionListener<ExecuteEnrichPolicyStatus> listener,
@@ -2016,7 +2105,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
             task,
             task,
             wrappedListener,
             wrappedListener,
             clusterService,
             clusterService,
-            client(),
+            client,
             resolver,
             resolver,
             () -> createTime,
             () -> createTime,
             randomIntBetween(1, 10000),
             randomIntBetween(1, 10000),