Răsfoiți Sursa

Batch execute template and pipeline cluster state operations (#86017)

This commit changes the cluster state operations for templates (legacy, component, and composable) as well as ingest pipelines to be bulk executed. This means that they can be processed much faster when creating/updating many simultaneously.

Relates to #77505
Lee Hinman 3 ani în urmă
părinte
comite
9af8cf26fd

+ 5 - 0
docs/changelog/86017.yaml

@@ -0,0 +1,5 @@
+pr: 86017
+summary: Batch execute template and pipeline cluster state operations
+area: "Indices APIs"
+type: enhancement
+issues: []

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/template/delete/TransportDeleteIndexTemplateAction.java

@@ -70,7 +70,7 @@ public class TransportDeleteIndexTemplateAction extends AcknowledgedTransportMas
     ) {
         indexTemplateService.removeTemplates(
             new MetadataIndexTemplateService.RemoveRequest(request.name()).masterTimeout(request.masterNodeTimeout()),
-            new MetadataIndexTemplateService.RemoveListener() {
+            new ActionListener<>() {
                 @Override
                 public void onResponse(AcknowledgedResponse response) {
                     listener.onResponse(response);

+ 3 - 3
server/src/main/java/org/elasticsearch/action/admin/indices/template/put/TransportPutIndexTemplateAction.java

@@ -94,10 +94,10 @@ public class TransportPutIndexTemplateAction extends AcknowledgedTransportMaster
                 .masterTimeout(request.masterNodeTimeout())
                 .version(request.version()),
 
-            new MetadataIndexTemplateService.PutListener() {
+            new ActionListener<>() {
                 @Override
-                public void onResponse(MetadataIndexTemplateService.PutResponse response) {
-                    listener.onResponse(AcknowledgedResponse.of(response.acknowledged()));
+                public void onResponse(AcknowledgedResponse response) {
+                    listener.onResponse(response);
                 }
 
                 @Override

+ 89 - 125
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

@@ -19,6 +19,8 @@ import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateTaskConfig;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
@@ -121,6 +123,53 @@ public class MetadataIndexTemplateService {
     private final SystemIndices systemIndices;
     private final Set<IndexSettingProvider> indexSettingProviders;
 
+    /**
+     * This is the cluster state task executor for all template-based actions.
+     */
+    private static final ClusterStateTaskExecutor<TemplateClusterStateUpdateTask> TEMPLATE_TASK_EXECUTOR = (currentState, taskContexts) -> {
+        ClusterState state = currentState;
+        for (final var taskContext : taskContexts) {
+            try {
+                final var task = taskContext.getTask();
+                state = task.execute(state);
+                taskContext.success(new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState));
+            } catch (Exception e) {
+                taskContext.onFailure(e);
+            }
+        }
+        return state;
+    };
+
+    /**
+     * A specialized cluster state update task that always takes a listener handling an
+     * AcknowledgedResponse, as all template actions have simple acknowledged yes/no responses.
+     */
+    private abstract static class TemplateClusterStateUpdateTask extends ClusterStateUpdateTask {
+        private final ActionListener<AcknowledgedResponse> listener;
+
+        TemplateClusterStateUpdateTask(Priority priority, TimeValue timeout, ActionListener<AcknowledgedResponse> listener) {
+            super(priority, timeout);
+            this.listener = listener;
+        }
+
+        public abstract ClusterState doExecute(ClusterState currentState) throws Exception;
+
+        @Override
+        public ClusterState execute(ClusterState currentState) throws Exception {
+            return doExecute(currentState);
+        }
+
+        @Override
+        public void onFailure(Exception e) {
+            listener.onFailure(e);
+        }
+
+        @Override
+        public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
+            listener.onResponse(AcknowledgedResponse.TRUE);
+        }
+    }
+
     @Inject
     public MetadataIndexTemplateService(
         ClusterService clusterService,
@@ -145,18 +194,12 @@ public class MetadataIndexTemplateService {
         clusterService.submitUnbatchedStateUpdateTask(source, task);
     }
 
-    public void removeTemplates(final RemoveRequest request, final RemoveListener listener) {
-        submitUnbatchedTask(
+    public void removeTemplates(final RemoveRequest request, final ActionListener<AcknowledgedResponse> listener) {
+        clusterService.submitStateUpdateTask(
             "remove-index-template [" + request.name + "]",
-            new ClusterStateUpdateTask(Priority.URGENT, request.masterTimeout) {
-
-                @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
-                }
-
+            new TemplateClusterStateUpdateTask(Priority.URGENT, request.masterTimeout, listener) {
                 @Override
-                public ClusterState execute(ClusterState currentState) {
+                public ClusterState doExecute(ClusterState currentState) {
                     Set<String> templateNames = new HashSet<>();
                     for (Map.Entry<String, IndexTemplateMetadata> cursor : currentState.metadata().templates().entrySet()) {
                         String templateName = cursor.getKey();
@@ -179,12 +222,9 @@ public class MetadataIndexTemplateService {
                     }
                     return ClusterState.builder(currentState).metadata(metadata).build();
                 }
-
-                @Override
-                public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-                    listener.onResponse(AcknowledgedResponse.TRUE);
-                }
-            }
+            },
+            ClusterStateTaskConfig.build(Priority.URGENT, request.masterTimeout),
+            TEMPLATE_TASK_EXECUTOR
         );
     }
 
@@ -200,25 +240,16 @@ public class MetadataIndexTemplateService {
         final ComponentTemplate template,
         final ActionListener<AcknowledgedResponse> listener
     ) {
-        submitUnbatchedTask(
+        clusterService.submitStateUpdateTask(
             "create-component-template [" + name + "], cause [" + cause + "]",
-            new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {
-
+            new TemplateClusterStateUpdateTask(Priority.URGENT, masterTimeout, listener) {
                 @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
-                }
-
-                @Override
-                public ClusterState execute(ClusterState currentState) throws Exception {
+                public ClusterState doExecute(ClusterState currentState) throws Exception {
                     return addComponentTemplate(currentState, create, name, template);
                 }
-
-                @Override
-                public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-                    listener.onResponse(AcknowledgedResponse.TRUE);
-                }
-            }
+            },
+            ClusterStateTaskConfig.build(Priority.URGENT, masterTimeout),
+            TEMPLATE_TASK_EXECUTOR
         );
     }
 
@@ -371,25 +402,16 @@ public class MetadataIndexTemplateService {
         final ActionListener<AcknowledgedResponse> listener
     ) {
         validateNotInUse(state.metadata(), names);
-        submitUnbatchedTask(
+        clusterService.submitStateUpdateTask(
             "remove-component-template [" + String.join(",", names) + "]",
-            new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {
-
+            new TemplateClusterStateUpdateTask(Priority.URGENT, masterTimeout, listener) {
                 @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
-                }
-
-                @Override
-                public ClusterState execute(ClusterState currentState) {
+                public ClusterState doExecute(ClusterState currentState) {
                     return innerRemoveComponentTemplate(currentState, names);
                 }
-
-                @Override
-                public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-                    listener.onResponse(AcknowledgedResponse.TRUE);
-                }
-            }
+            },
+            ClusterStateTaskConfig.build(Priority.URGENT, masterTimeout),
+            TEMPLATE_TASK_EXECUTOR
         );
     }
 
@@ -487,25 +509,16 @@ public class MetadataIndexTemplateService {
         final ActionListener<AcknowledgedResponse> listener
     ) {
         validateV2TemplateRequest(clusterService.state().metadata(), name, template);
-        submitUnbatchedTask(
+        clusterService.submitStateUpdateTask(
             "create-index-template-v2 [" + name + "], cause [" + cause + "]",
-            new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {
-
+            new TemplateClusterStateUpdateTask(Priority.URGENT, masterTimeout, listener) {
                 @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
-                }
-
-                @Override
-                public ClusterState execute(ClusterState currentState) throws Exception {
+                public ClusterState doExecute(ClusterState currentState) throws Exception {
                     return addIndexTemplateV2(currentState, create, name, template);
                 }
-
-                @Override
-                public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-                    listener.onResponse(AcknowledgedResponse.TRUE);
-                }
-            }
+            },
+            ClusterStateTaskConfig.build(Priority.URGENT, masterTimeout),
+            TEMPLATE_TASK_EXECUTOR
         );
     }
 
@@ -880,25 +893,16 @@ public class MetadataIndexTemplateService {
         final TimeValue masterTimeout,
         final ActionListener<AcknowledgedResponse> listener
     ) {
-        submitUnbatchedTask(
+        clusterService.submitStateUpdateTask(
             "remove-index-template-v2 [" + String.join(",", names) + "]",
-            new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {
-
+            new TemplateClusterStateUpdateTask(Priority.URGENT, masterTimeout, listener) {
                 @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
-                }
-
-                @Override
-                public ClusterState execute(ClusterState currentState) {
+                public ClusterState doExecute(ClusterState currentState) {
                     return innerRemoveIndexTemplateV2(currentState, names);
                 }
-
-                @Override
-                public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-                    listener.onResponse(AcknowledgedResponse.TRUE);
-                }
-            }
+            },
+            ClusterStateTaskConfig.build(Priority.URGENT, masterTimeout),
+            TEMPLATE_TASK_EXECUTOR
         );
     }
 
@@ -990,7 +994,7 @@ public class MetadataIndexTemplateService {
             .collect(Collectors.toSet());
     }
 
-    public void putTemplate(final PutRequest request, final PutListener listener) {
+    public void putTemplate(final PutRequest request, final ActionListener<AcknowledgedResponse> listener) {
         Settings.Builder updatedSettingsBuilder = Settings.builder();
         updatedSettingsBuilder.put(request.settings).normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX);
         request.settings(updatedSettingsBuilder.build());
@@ -1013,26 +1017,17 @@ public class MetadataIndexTemplateService {
 
         final IndexTemplateMetadata.Builder templateBuilder = IndexTemplateMetadata.builder(request.name);
 
-        submitUnbatchedTask(
+        clusterService.submitStateUpdateTask(
             "create-index-template [" + request.name + "], cause [" + request.cause + "]",
-            new ClusterStateUpdateTask(Priority.URGENT, request.masterTimeout) {
-
+            new TemplateClusterStateUpdateTask(Priority.URGENT, request.masterTimeout, listener) {
                 @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
-                }
-
-                @Override
-                public ClusterState execute(ClusterState currentState) throws Exception {
+                public ClusterState doExecute(ClusterState currentState) throws Exception {
                     validateTemplate(request.settings, request.mappings, indicesService);
                     return innerPutTemplate(currentState, request, templateBuilder);
                 }
-
-                @Override
-                public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-                    listener.onResponse(new PutResponse(true));
-                }
-            }
+            },
+            ClusterStateTaskConfig.build(Priority.URGENT, request.masterTimeout),
+            TEMPLATE_TASK_EXECUTOR
         );
     }
 
@@ -1480,7 +1475,7 @@ public class MetadataIndexTemplateService {
             try {
                 MapperService mapperService = tempIndexService.mapperService();
                 for (CompressedXContent mapping : mappings) {
-                    mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mapping, MergeReason.INDEX_TEMPLATE);
+                    mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mapping, MapperService.MergeReason.INDEX_TEMPLATE);
                 }
 
                 if (template.getDataStreamTemplate() != null) {
@@ -1550,12 +1545,7 @@ public class MetadataIndexTemplateService {
             name,
             maybeTemplate.map(Template::settings).orElse(Settings.EMPTY),
             indexPatterns,
-            maybeTemplate.map(Template::aliases)
-                .orElse(Collections.emptyMap())
-                .values()
-                .stream()
-                .map(MetadataIndexTemplateService::toAlias)
-                .toList()
+            maybeTemplate.map(Template::aliases).orElse(emptyMap()).values().stream().map(MetadataIndexTemplateService::toAlias).toList()
         );
     }
 
@@ -1655,13 +1645,6 @@ public class MetadataIndexTemplateService {
         }
     }
 
-    public interface PutListener {
-
-        void onResponse(PutResponse response);
-
-        void onFailure(Exception e);
-    }
-
     public static class PutRequest {
         final String name;
         final String cause;
@@ -1721,18 +1704,6 @@ public class MetadataIndexTemplateService {
         }
     }
 
-    public static class PutResponse {
-        private final boolean acknowledged;
-
-        public PutResponse(boolean acknowledged) {
-            this.acknowledged = acknowledged;
-        }
-
-        public boolean acknowledged() {
-            return acknowledged;
-        }
-    }
-
     public static class RemoveRequest {
         final String name;
         TimeValue masterTimeout = MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT;
@@ -1746,11 +1717,4 @@ public class MetadataIndexTemplateService {
             return this;
         }
     }
-
-    public interface RemoveListener {
-
-        void onResponse(AcknowledgedResponse response);
-
-        void onFailure(Exception e);
-    }
 }

+ 72 - 13
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -25,10 +25,11 @@ import org.elasticsearch.action.ingest.DeletePipelineRequest;
 import org.elasticsearch.action.ingest.PutPipelineRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.internal.Client;
-import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateApplier;
+import org.elasticsearch.cluster.ClusterStateTaskConfig;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.DataStream.TimestampField;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
@@ -39,6 +40,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.regex.Regex;
@@ -105,6 +107,53 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
     private final List<Consumer<ClusterState>> ingestClusterStateListeners = new CopyOnWriteArrayList<>();
     private volatile ClusterState state;
 
+    /**
+     * Cluster state task executor for ingest pipeline operations
+     */
+    private static final ClusterStateTaskExecutor<PipelineClusterStateUpdateTask> PIPELINE_TASK_EXECUTOR = (currentState, taskContexts) -> {
+        ClusterState state = currentState;
+        for (final var taskContext : taskContexts) {
+            try {
+                final var task = taskContext.getTask();
+                state = task.execute(state);
+                taskContext.success(new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState));
+            } catch (Exception e) {
+                taskContext.onFailure(e);
+            }
+        }
+        return state;
+    };
+
+    /**
+     * Specialized cluster state update task specifically for ingest pipeline operations.
+     * These operations all receive an AcknowledgedResponse.
+     */
+    private abstract static class PipelineClusterStateUpdateTask extends ClusterStateUpdateTask {
+        private final ActionListener<AcknowledgedResponse> listener;
+
+        PipelineClusterStateUpdateTask(TimeValue timeout, ActionListener<AcknowledgedResponse> listener) {
+            super(timeout);
+            this.listener = listener;
+        }
+
+        public abstract ClusterState doExecute(ClusterState currentState) throws Exception;
+
+        @Override
+        public ClusterState execute(ClusterState currentState) throws Exception {
+            return doExecute(currentState);
+        }
+
+        @Override
+        public void onFailure(Exception e) {
+            listener.onFailure(e);
+        }
+
+        @Override
+        public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
+            listener.onResponse(AcknowledgedResponse.TRUE);
+        }
+    }
+
     public IngestService(
         ClusterService clusterService,
         ThreadPool threadPool,
@@ -285,12 +334,17 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
      * Deletes the pipeline specified by id in the request.
      */
     public void delete(DeletePipelineRequest request, ActionListener<AcknowledgedResponse> listener) {
-        submitUnbatchedTask("delete-pipeline-" + request.getId(), new AckedClusterStateUpdateTask(request, listener) {
-            @Override
-            public ClusterState execute(ClusterState currentState) {
-                return innerDelete(request, currentState);
-            }
-        });
+        clusterService.submitStateUpdateTask(
+            "delete-pipeline-" + request.getId(),
+            new PipelineClusterStateUpdateTask(request.timeout(), listener) {
+                @Override
+                public ClusterState doExecute(ClusterState currentState) {
+                    return innerDelete(request, currentState);
+                }
+            },
+            ClusterStateTaskConfig.build(Priority.NORMAL, request.masterNodeTimeout()),
+            PIPELINE_TASK_EXECUTOR
+        );
     }
 
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
@@ -443,12 +497,17 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
             }
 
             validatePipeline(ingestInfos, request.getId(), config);
-            submitUnbatchedTask("put-pipeline-" + request.getId(), new AckedClusterStateUpdateTask(request, listener) {
-                @Override
-                public ClusterState execute(ClusterState currentState) {
-                    return innerPut(request, currentState);
-                }
-            });
+            clusterService.submitStateUpdateTask(
+                "put-pipeline-" + request.getId(),
+                new PipelineClusterStateUpdateTask(request.timeout(), listener) {
+                    @Override
+                    public ClusterState doExecute(ClusterState currentState) {
+                        return innerPut(request, currentState);
+                    }
+                },
+                ClusterStateTaskConfig.build(Priority.NORMAL, request.masterNodeTimeout()),
+                PIPELINE_TASK_EXECUTOR
+            );
         }, listener::onFailure));
     }
 

+ 4 - 4
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java

@@ -2029,9 +2029,9 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
         );
 
         final List<Throwable> throwables = new ArrayList<>();
-        service.putTemplate(request, new MetadataIndexTemplateService.PutListener() {
+        service.putTemplate(request, new ActionListener<>() {
             @Override
-            public void onResponse(MetadataIndexTemplateService.PutResponse response) {
+            public void onResponse(AcknowledgedResponse response) {
 
             }
 
@@ -2048,9 +2048,9 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
 
         final List<Throwable> throwables = new ArrayList<>();
         final CountDownLatch latch = new CountDownLatch(1);
-        service.putTemplate(request, new MetadataIndexTemplateService.PutListener() {
+        service.putTemplate(request, new ActionListener<>() {
             @Override
-            public void onResponse(MetadataIndexTemplateService.PutResponse response) {
+            public void onResponse(AcknowledgedResponse response) {
                 latch.countDown();
             }