Browse Source

[ML] Cancel reindex task from correct thread context (#54874)

When a data frame analytics job is stopped, if the reindexing
task was still in progress we cancel it. Cancelling it should
be done from the same context as when we executed the reindexing
task. That means from a thread context with ML origin.
Dimitris Athanasiou 5 years ago
parent
commit
b0933539d1

+ 13 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

@@ -25,6 +25,7 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.IdsQueryBuilder;
@@ -161,7 +162,11 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
         cancelReindex.setTaskId(reindexTaskId);
         cancelReindex.setTaskId(reindexTaskId);
         cancelReindex.setReason(reason);
         cancelReindex.setReason(reason);
         cancelReindex.setTimeout(timeout);
         cancelReindex.setTimeout(timeout);
-        CancelTasksResponse cancelReindexResponse = client.admin().cluster().cancelTasks(cancelReindex).actionGet();
+
+        // We need to cancel the reindexing task within context with ML origin as we started the task
+        // from the same context
+        CancelTasksResponse cancelReindexResponse = cancelTaskWithinMlOriginContext(cancelReindex);
+
         Throwable firstError = null;
         Throwable firstError = null;
         if (cancelReindexResponse.getNodeFailures().isEmpty() == false) {
         if (cancelReindexResponse.getNodeFailures().isEmpty() == false) {
             firstError = cancelReindexResponse.getNodeFailures().get(0).getRootCause();
             firstError = cancelReindexResponse.getNodeFailures().get(0).getRootCause();
@@ -178,6 +183,13 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
         }
         }
     }
     }
 
 
+    private CancelTasksResponse cancelTaskWithinMlOriginContext(CancelTasksRequest cancelTasksRequest) {
+        final ThreadContext threadContext = client.threadPool().getThreadContext();
+        try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(ML_ORIGIN)) {
+            return client.admin().cluster().cancelTasks(cancelTasksRequest).actionGet();
+        }
+    }
+
     public void setFailed(String reason) {
     public void setFailed(String reason) {
         DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED,
         DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED,
                 getAllocationId(), reason);
                 getAllocationId(), reason);