瀏覽代碼

[Transform] Prevent stopping of transforms due to threadpool limitation (#81912)

remove the indexer threadpool and use the generic threadpool instead(The indexer threadpool was only used on start)

fixes #81796
Hendrik Muhs 3 年之前
父節點
當前提交
c9113a5e91

+ 0 - 17
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

@@ -47,8 +47,6 @@ import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestHandler;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.script.ScriptService;
-import org.elasticsearch.threadpool.ExecutorBuilder;
-import org.elasticsearch.threadpool.FixedExecutorBuilder;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.watcher.ResourceWatcherService;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -123,7 +121,6 @@ import static org.elasticsearch.xpack.core.transform.transforms.persistence.Tran
 public class Transform extends Plugin implements SystemIndexPlugin, PersistentTaskPlugin {
 
     public static final String NAME = "transform";
-    public static final String TASK_THREAD_POOL_NAME = "transform_indexing";
 
     private static final Logger logger = LogManager.getLogger(Transform.class);
 
@@ -201,20 +198,6 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
         );
     }
 
-    @Override
-    public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
-        FixedExecutorBuilder indexing = new FixedExecutorBuilder(
-            settingsToUse,
-            TASK_THREAD_POOL_NAME,
-            4,
-            4,
-            "transform.task_thread_pool",
-            false
-        );
-
-        return Collections.singletonList(indexing);
-    }
-
     @Override
     public Collection<Object> createComponents(
         Client client,

+ 2 - 4
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -571,7 +571,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
 
     @Override
     public boolean maybeTriggerAsyncJob(long now) {
-        boolean triggered;
+        // threadpool: trigger_engine_scheduler if triggered from the scheduler, generic if called from the task on start
 
         if (context.getTaskState() == TransformTaskState.FAILED) {
             logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getJobId());
@@ -602,10 +602,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
                 return false;
             }
 
-            triggered = super.maybeTriggerAsyncJob(now);
+            return super.maybeTriggerAsyncJob(now);
         }
-
-        return triggered;
     }
 
     /**

+ 26 - 5
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java

@@ -83,7 +83,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
         Settings settings,
         IndexNameExpressionResolver resolver
     ) {
-        super(TransformField.TASK_NAME, Transform.TASK_THREAD_POOL_NAME);
+        super(TransformField.TASK_NAME, ThreadPool.Names.GENERIC);
         this.client = client;
         this.transformServices = transformServices;
         this.threadPool = threadPool;
@@ -100,6 +100,13 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
         Collection<DiscoveryNode> candidateNodes,
         ClusterState clusterState
     ) {
+        /* Note:
+         *
+         * This method is executed on the _master_ node. The master and transform node might be on a different version.
+         * Therefore certain checks must happen on the corresponding node, e.g. the existence of the internal index.
+         *
+         * Operations on the transform node happen in {@link #nodeOperation()}
+         */
         if (TransformMetadata.getTransformMetadata(clusterState).isResetMode()) {
             return new PersistentTasksCustomMetadata.Assignment(
                 null,
@@ -160,6 +167,12 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
 
     @Override
     protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTaskParams params, PersistentTaskState state) {
+        /* Note:
+         *
+         * This method is executed on the _transform_ node. The master and transform node might be on a different version.
+         * Operations on master happen in {@link #getAssignment()}
+         */
+
         final String transformId = params.getId();
         final TransformTask buildTask = (TransformTask) task;
         // NOTE: TransformPersistentTasksExecutor#createTask pulls in the stored task state from the ClusterState when the object
@@ -188,6 +201,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
 
         // <6> load next checkpoint
         ActionListener<TransformCheckpoint> getTransformNextCheckpointListener = ActionListener.wrap(nextCheckpoint -> {
+            // threadpool: system_read
 
             if (nextCheckpoint.isEmpty()) {
                 // extra safety: reset position and progress if next checkpoint is empty
@@ -211,8 +225,9 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
 
         // <5> load last checkpoint
         ActionListener<TransformCheckpoint> getTransformLastCheckpointListener = ActionListener.wrap(lastCheckpoint -> {
-            indexerBuilder.setLastCheckpoint(lastCheckpoint);
+            // threadpool: system_read
 
+            indexerBuilder.setLastCheckpoint(lastCheckpoint);
             logger.trace("[{}] Loaded last checkpoint [{}], looking for next checkpoint", transformId, lastCheckpoint.getCheckpoint());
             transformServices.getConfigManager()
                 .getTransformCheckpoint(transformId, lastCheckpoint.getCheckpoint() + 1, getTransformNextCheckpointListener);
@@ -227,6 +242,8 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
         // Schedule execution regardless
         ActionListener<Tuple<TransformStoredDoc, SeqNoPrimaryTermAndIndex>> transformStatsActionListener = ActionListener.wrap(
             stateAndStatsAndSeqNoPrimaryTermAndIndex -> {
+                // threadpool: system_read
+
                 TransformStoredDoc stateAndStats = stateAndStatsAndSeqNoPrimaryTermAndIndex.v1();
                 SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = stateAndStatsAndSeqNoPrimaryTermAndIndex.v2();
                 // Since we have not set the value for this yet, it SHOULD be null
@@ -272,6 +289,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
 
         // <3> Validate the transform, assigning it to the indexer, and get the previous stats (if they exist)
         ActionListener<TransformConfig> getTransformConfigListener = ActionListener.wrap(config -> {
+            // threadpool: system_read
 
             // fail if a transform is too old, this can only happen on a rolling upgrade
             if (config.getVersion() == null || config.getVersion().before(TransformDeprecations.MIN_TRANSFORM_VERSION)) {
@@ -368,9 +386,12 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
         Long previousCheckpoint,
         ActionListener<StartTransformAction.Response> listener
     ) {
-        buildTask.initializeIndexer(indexerBuilder);
-        // TransformTask#start will fail if the task state is FAILED
-        buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener);
+        // switch the threadpool to generic, because the caller is on the system_read threadpool
+        threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
+            buildTask.initializeIndexer(indexerBuilder);
+            // TransformTask#start will fail if the task state is FAILED
+            buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener);
+        });
     }
 
     private void setNumFailureRetries(int numFailureRetries) {