Преглед изворни кода

Adding ability to auto-install ingest pipelines (#95782)

eyalkoren пре 2 година
родитељ
комит
1dda9893e9

+ 5 - 0
docs/changelog/95782.yaml

@@ -0,0 +1,5 @@
+pr: 95782
+summary: Adding ability to auto-install ingest pipelines and refer to them from index templates
+area: Data streams
+type: feature
+issues: []

+ 139 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java

@@ -13,6 +13,8 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
 import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.action.ingest.PutPipelineAction;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -21,11 +23,15 @@ import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.metadata.ComponentTemplate;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
+import org.elasticsearch.cluster.metadata.Template;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.gateway.GatewayService;
+import org.elasticsearch.ingest.IngestMetadata;
+import org.elasticsearch.ingest.PipelineConfiguration;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xcontent.XContentParserConfiguration;
@@ -52,7 +58,7 @@ import static org.elasticsearch.core.Strings.format;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 
 /**
- * Abstracts the logic of managing versioned index templates and lifecycle policies for plugins that require such things.
+ * Abstracts the logic of managing versioned index templates, ingest pipelines and lifecycle policies for plugins that require such things.
  */
 public abstract class IndexTemplateRegistry implements ClusterStateListener {
     private static final Logger logger = LogManager.getLogger(IndexTemplateRegistry.class);
@@ -64,6 +70,7 @@ public abstract class IndexTemplateRegistry implements ClusterStateListener {
     protected final ClusterService clusterService;
     protected final ConcurrentMap<String, AtomicBoolean> templateCreationsInProgress = new ConcurrentHashMap<>();
     protected final ConcurrentMap<String, AtomicBoolean> policyCreationsInProgress = new ConcurrentHashMap<>();
+    protected final ConcurrentMap<String, AtomicBoolean> pipelineCreationsInProgress = new ConcurrentHashMap<>();
 
     public IndexTemplateRegistry(
         Settings nodeSettings,
@@ -124,6 +131,15 @@ public abstract class IndexTemplateRegistry implements ClusterStateListener {
         return Collections.emptyList();
     }
 
+    /**
+     * Retrieves a list of {@link IngestPipelineConfig} that represents the ingest pipelines
+     * that should be installed and managed.
+     * @return The configurations for ingest pipelines that should be installed.
+     */
+    protected List<IngestPipelineConfig> getIngestPipelines() {
+        return Collections.emptyList();
+    }
+
     /**
      * Retrieves an identifier that is used to identify which plugin is asking for this.
      * @return A string ID for the plugin managing these templates.
@@ -175,6 +191,7 @@ public abstract class IndexTemplateRegistry implements ClusterStateListener {
         boolean localNodeVersionAfterMaster = localNode.getVersion().after(masterNode.getVersion());
 
         if (event.localNodeMaster() || localNodeVersionAfterMaster) {
+            addIngestPipelinesIfMissing(state);
             addTemplatesIfMissing(state);
             addIndexLifecyclePoliciesIfMissing(state);
         }
@@ -242,7 +259,14 @@ public abstract class IndexTemplateRegistry implements ClusterStateListener {
             final AtomicBoolean creationCheck = templateCreationsInProgress.computeIfAbsent(templateName, key -> new AtomicBoolean(false));
             if (creationCheck.compareAndSet(false, true)) {
                 ComponentTemplate currentTemplate = state.metadata().componentTemplates().get(templateName);
-                if (Objects.isNull(currentTemplate)) {
+                if (templateDependenciesSatisfied(state, newTemplate.getValue()) == false) {
+                    creationCheck.set(false);
+                    logger.trace(
+                        "not adding index template [{}] for [{}] because its required dependencies do not exist",
+                        templateName,
+                        getOrigin()
+                    );
+                } else if (Objects.isNull(currentTemplate)) {
                     logger.debug("adding component template [{}] for [{}], because it doesn't exist", templateName, getOrigin());
                     putComponentTemplate(templateName, newTemplate.getValue(), creationCheck);
                 } else if (Objects.isNull(currentTemplate.version()) || newTemplate.getValue().version() > currentTemplate.version()) {
@@ -275,6 +299,32 @@ public abstract class IndexTemplateRegistry implements ClusterStateListener {
         }
     }
 
+    /**
+     * Returns true if the cluster state contains all of the dependencies required by the provided component template
+     */
+    private static boolean templateDependenciesSatisfied(ClusterState state, ComponentTemplate indexTemplate) {
+        Template template = indexTemplate.template();
+        if (template == null) {
+            return true;
+        }
+        Settings settings = template.settings();
+        if (settings == null) {
+            return true;
+        }
+        IngestMetadata ingestMetadata = state.metadata().custom(IngestMetadata.TYPE);
+        String defaultPipeline = settings.get("index.default_pipeline");
+        if (defaultPipeline != null) {
+            if (ingestMetadata == null || ingestMetadata.getPipelines().containsKey(defaultPipeline) == false) {
+                return false;
+            }
+        }
+        String finalPipeline = settings.get("index.final_pipeline");
+        if (finalPipeline != null) {
+            return ingestMetadata != null && ingestMetadata.getPipelines().containsKey(finalPipeline);
+        }
+        return true;
+    }
+
     private void addComposableTemplatesIfMissing(ClusterState state) {
         final Map<String, ComposableIndexTemplate> indexTemplates = getComposableTemplateConfigs();
         for (Map.Entry<String, ComposableIndexTemplate> newTemplate : indexTemplates.entrySet()) {
@@ -504,4 +554,91 @@ public abstract class IndexTemplateRegistry implements ClusterStateListener {
         }));
     }
 
+    private void addIngestPipelinesIfMissing(ClusterState state) {
+        for (IngestPipelineConfig requiredPipeline : getIngestPipelines()) {
+            final AtomicBoolean creationCheck = pipelineCreationsInProgress.computeIfAbsent(
+                requiredPipeline.getId(),
+                key -> new AtomicBoolean(false)
+            );
+
+            if (creationCheck.compareAndSet(false, true)) {
+                PipelineConfiguration existingPipeline = findInstalledPipeline(state, requiredPipeline.getId());
+                if (existingPipeline != null) {
+                    Integer existingPipelineVersion = existingPipeline.getVersion();
+                    if (existingPipelineVersion == null || existingPipelineVersion < requiredPipeline.getVersion()) {
+                        logger.info(
+                            "upgrading ingest pipeline [{}] for [{}] from version [{}] to version [{}]",
+                            requiredPipeline.getId(),
+                            getOrigin(),
+                            existingPipelineVersion,
+                            requiredPipeline.getVersion()
+                        );
+                        putIngestPipeline(requiredPipeline, creationCheck);
+                    } else {
+                        logger.debug(
+                            "not adding ingest pipeline [{}] for [{}], because it already exists",
+                            requiredPipeline.getId(),
+                            getOrigin()
+                        );
+                        creationCheck.set(false);
+                    }
+                } else {
+                    logger.debug("adding ingest pipeline [{}] for [{}], because it doesn't exist", requiredPipeline.getId(), getOrigin());
+                    putIngestPipeline(requiredPipeline, creationCheck);
+                }
+            }
+        }
+    }
+
+    @Nullable
+    private static PipelineConfiguration findInstalledPipeline(ClusterState state, String pipelineId) {
+        Optional<IngestMetadata> maybeMeta = Optional.ofNullable(state.metadata().custom(IngestMetadata.TYPE));
+        return maybeMeta.map(ingestMetadata -> ingestMetadata.getPipelines().get(pipelineId)).orElse(null);
+    }
+
+    private void putIngestPipeline(final IngestPipelineConfig pipelineConfig, final AtomicBoolean creationCheck) {
+        final Executor executor = threadPool.generic();
+        executor.execute(() -> {
+            PutPipelineRequest request = new PutPipelineRequest(pipelineConfig.getId(), pipelineConfig.loadConfig(), XContentType.JSON);
+            request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
+
+            executeAsyncWithOrigin(
+                client.threadPool().getThreadContext(),
+                getOrigin(),
+                request,
+                new ActionListener<AcknowledgedResponse>() {
+                    @Override
+                    public void onResponse(AcknowledgedResponse response) {
+                        creationCheck.set(false);
+                        if (response.isAcknowledged() == false) {
+                            logger.error(
+                                "error adding ingest pipeline [{}] for [{}], request was not acknowledged",
+                                pipelineConfig.getId(),
+                                getOrigin()
+                            );
+                        } else {
+                            logger.info("adding ingest pipeline {}", pipelineConfig.getId());
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        creationCheck.set(false);
+                        onPutPipelineFailure(pipelineConfig.getId(), e);
+                    }
+                },
+                (req, listener) -> client.execute(PutPipelineAction.INSTANCE, req, listener)
+            );
+        });
+    }
+
+    /**
+     * Called when creation of an ingest pipeline fails.
+     *
+     * @param pipelineId the pipeline that failed to be created.
+     * @param e The exception that caused the failure.
+     */
+    protected void onPutPipelineFailure(String pipelineId, Exception e) {
+        logger.error(() -> format("error adding ingest pipeline template [%s] for [%s]", pipelineId, getOrigin()), e);
+    }
 }

+ 46 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IngestPipelineConfig.java

@@ -0,0 +1,46 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.template;
+
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+
+import java.util.Objects;
+
+/**
+ * Describes an ingest pipeline to be loaded from a resource file for use with an {@link IndexTemplateRegistry}.
+ */
+public class IngestPipelineConfig {
+    private final String id;
+    private final String resource;
+    private final int version;
+    private final String versionProperty;
+
+    public IngestPipelineConfig(String id, String resource, int version, String versionProperty) {
+        this.id = Objects.requireNonNull(id);
+        this.resource = Objects.requireNonNull(resource);
+        this.version = version;
+        this.versionProperty = Objects.requireNonNull(versionProperty);
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public String getVersionProperty() {
+        return versionProperty;
+    }
+
+    public BytesReference loadConfig() {
+        return new BytesArray(TemplateUtils.loadTemplate(resource, String.valueOf(version), versionProperty));
+    }
+}

+ 404 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistryTests.java

@@ -0,0 +1,404 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.template;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
+import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
+import org.elasticsearch.action.ingest.PutPipelineAction;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlocks;
+import org.elasticsearch.cluster.metadata.ComponentTemplate;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.node.TestDiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.TriFunction;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Strings;
+import org.elasticsearch.ingest.IngestMetadata;
+import org.elasticsearch.ingest.PipelineConfiguration;
+import org.elasticsearch.test.ClusterServiceUtils;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.client.NoOpClient;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xcontent.NamedXContentRegistry;
+import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
+import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
+import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
+import org.elasticsearch.xpack.core.ilm.OperationMode;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.oneOf;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class IndexTemplateRegistryTests extends ESTestCase {
+    private TestRegistryWithCustomPlugin registry;
+    private ClusterService clusterService;
+    private ThreadPool threadPool;
+    private VerifyingClient client;
+
+    @Before
+    public void createRegistryAndClient() {
+        threadPool = new TestThreadPool(this.getClass().getName());
+        client = new VerifyingClient(threadPool);
+        clusterService = ClusterServiceUtils.createClusterService(threadPool);
+        registry = new TestRegistryWithCustomPlugin(Settings.EMPTY, clusterService, threadPool, client, NamedXContentRegistry.EMPTY);
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        threadPool.shutdownNow();
+    }
+
+    public void testThatPipelinesAreAddedImmediately() throws Exception {
+        DiscoveryNode node = TestDiscoveryNode.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+        client.setVerifier((action, request, listener) -> {
+            if (action instanceof PutPipelineAction) {
+                assertPutPipelineAction(calledTimes, action, request, listener);
+                return AcknowledgedResponse.TRUE;
+            } else {
+                // the composable template is not expected to be added, as it's dependency is not available in the cluster state
+                // custom-plugin-settings.json is not expected to be added as it contains a dependency on the default_pipeline
+                fail("client called with unexpected request: " + request.toString());
+                return null;
+            }
+        });
+
+        ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), nodes);
+        registry.clusterChanged(event);
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(2)));
+    }
+
+    public void testThatTemplateIsAddedIfAllDependenciesExist() throws Exception {
+        DiscoveryNode node = TestDiscoveryNode.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+        client.setVerifier((action, request, listener) -> {
+            if (action instanceof PutComponentTemplateAction) {
+                assertPutComponentTemplate(calledTimes, action, request, listener);
+                return AcknowledgedResponse.TRUE;
+            } else {
+                // the composable template is not expected to be added, as it's dependency is not available in the cluster state
+                fail("client called with unexpected request: " + request.toString());
+                return null;
+            }
+        });
+
+        ClusterChangedEvent event = createClusterChangedEvent(
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Map.of("custom-plugin-default_pipeline", 3, "custom-plugin-final_pipeline", 3),
+            nodes
+        );
+        registry.clusterChanged(event);
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
+    }
+
+    public void testThatTemplateIsNotAddedIfNotAllDependenciesExist() throws Exception {
+        DiscoveryNode node = TestDiscoveryNode.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+        client.setVerifier((action, request, listener) -> {
+            if (action instanceof PutPipelineAction) {
+                assertPutPipelineAction(calledTimes, action, request, listener);
+                return AcknowledgedResponse.TRUE;
+            } else {
+                // the template is not expected to be added, as the final pipeline is missing
+                fail("client called with unexpected request: " + request.toString());
+                return null;
+            }
+        });
+
+        ClusterChangedEvent event = createClusterChangedEvent(
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Map.of("custom-plugin-default_pipeline", 3),
+            nodes
+        );
+        registry.clusterChanged(event);
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
+    }
+
+    public void testThatComposableTemplateIsAddedIfDependenciesExist() throws Exception {
+        DiscoveryNode node = TestDiscoveryNode.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+        client.setVerifier((action, request, listener) -> {
+            if (action instanceof PutComposableIndexTemplateAction) {
+                assertPutComposableIndexTemplateAction(calledTimes, action, request, listener);
+                return AcknowledgedResponse.TRUE;
+            } else if (action instanceof PutPipelineAction) {
+                // ignore pipelines in this case
+                return AcknowledgedResponse.TRUE;
+            } else {
+                // other components should be added as they already exist with the right version already
+                fail("client called with unexpected request: " + request.toString());
+                return null;
+            }
+        });
+
+        ClusterChangedEvent event = createClusterChangedEvent(Collections.singletonMap("custom-plugin-settings", 3), nodes);
+        registry.clusterChanged(event);
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
+    }
+
+    public void testThatTemplatesAreUpgradedWhenNeeded() throws Exception {
+        DiscoveryNode node = TestDiscoveryNode.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+        client.setVerifier((action, request, listener) -> {
+            if (action instanceof PutPipelineAction) {
+                assertPutPipelineAction(calledTimes, action, request, listener);
+                return AcknowledgedResponse.TRUE;
+            } else if (action instanceof PutComponentTemplateAction) {
+                assertPutComponentTemplate(calledTimes, action, request, listener);
+                return AcknowledgedResponse.TRUE;
+            } else if (action instanceof PutComposableIndexTemplateAction) {
+                assertPutComposableIndexTemplateAction(calledTimes, action, request, listener);
+                return AcknowledgedResponse.TRUE;
+            } else {
+                fail("client called with unexpected request: " + request.toString());
+                return null;
+            }
+        });
+
+        ClusterChangedEvent event = createClusterChangedEvent(
+            Map.of("custom-plugin-settings", 2, "custom-plugin-template", 2),
+            Collections.emptyMap(),
+            Map.of("custom-plugin-default_pipeline", 2, "custom-plugin-final_pipeline", 2),
+            nodes
+        );
+        registry.clusterChanged(event);
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(4)));
+    }
+
+    public void testThatTemplatesAreNotUpgradedWhenNotNeeded() throws Exception {
+        DiscoveryNode node = TestDiscoveryNode.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+        client.setVerifier((action, request, listener) -> {
+            if (action instanceof PutComposableIndexTemplateAction) {
+                // ignore this
+                return AcknowledgedResponse.TRUE;
+            } else {
+                fail("client called with unexpected request: " + request.toString());
+                return null;
+            }
+        });
+
+        ClusterChangedEvent event = createClusterChangedEvent(
+            Map.of("custom-plugin-settings", 3),
+            Collections.emptyMap(),
+            Map.of("custom-plugin-default_pipeline", 3, "custom-plugin-final_pipeline", 3),
+            nodes
+        );
+        registry.clusterChanged(event);
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(0)));
+    }
+
+    private static void assertPutComponentTemplate(
+        AtomicInteger calledTimes,
+        ActionType<?> action,
+        ActionRequest request,
+        ActionListener<?> listener
+    ) {
+        assertThat(action, instanceOf(PutComponentTemplateAction.class));
+        assertThat(request, instanceOf(PutComponentTemplateAction.Request.class));
+        final PutComponentTemplateAction.Request putRequest = (PutComponentTemplateAction.Request) request;
+        assertThat(putRequest.name(), equalTo("custom-plugin-settings"));
+        ComponentTemplate componentTemplate = putRequest.componentTemplate();
+        assertThat(componentTemplate.template().settings().get("index.default_pipeline"), equalTo("custom-plugin-default_pipeline"));
+        assertThat(componentTemplate.metadata().get("description"), equalTo("settings for my application logs"));
+        assertNotNull(listener);
+        calledTimes.incrementAndGet();
+    }
+
+    private static void assertPutComposableIndexTemplateAction(
+        AtomicInteger calledTimes,
+        ActionType<?> action,
+        ActionRequest request,
+        ActionListener<?> listener
+    ) {
+        assertThat(request, instanceOf(PutComposableIndexTemplateAction.Request.class));
+        PutComposableIndexTemplateAction.Request putComposableTemplateRequest = (PutComposableIndexTemplateAction.Request) request;
+        assertThat(putComposableTemplateRequest.name(), equalTo("custom-plugin-template"));
+        ComposableIndexTemplate composableIndexTemplate = putComposableTemplateRequest.indexTemplate();
+        assertThat(composableIndexTemplate.composedOf(), hasSize(2));
+        assertThat(composableIndexTemplate.composedOf().get(0), equalTo("custom-plugin-settings"));
+        assertThat(composableIndexTemplate.composedOf().get(1), equalTo("syslog@custom"));
+        assertThat(composableIndexTemplate.getIgnoreMissingComponentTemplates(), hasSize(1));
+        assertThat(composableIndexTemplate.getIgnoreMissingComponentTemplates().get(0), equalTo("syslog@custom"));
+        assertNotNull(listener);
+        calledTimes.incrementAndGet();
+    }
+
+    private static void assertPutPipelineAction(
+        AtomicInteger calledTimes,
+        ActionType<?> action,
+        ActionRequest request,
+        ActionListener<?> listener
+    ) {
+        assertThat(action, instanceOf(PutPipelineAction.class));
+        assertThat(request, instanceOf(PutPipelineRequest.class));
+        final PutPipelineRequest putRequest = (PutPipelineRequest) request;
+        assertThat(putRequest.getId(), oneOf("custom-plugin-default_pipeline", "custom-plugin-final_pipeline"));
+        PipelineConfiguration pipelineConfiguration = new PipelineConfiguration(
+            putRequest.getId(),
+            putRequest.getSource(),
+            putRequest.getXContentType()
+        );
+        List<?> processors = (List<?>) pipelineConfiguration.getConfigAsMap().get("processors");
+        assertThat(processors, hasSize(1));
+        Map<?, ?> setProcessor = (Map<?, ?>) ((Map<?, ?>) processors.get(0)).get("set");
+        assertNotNull(setProcessor.get("field"));
+        assertNotNull(setProcessor.get("copy_from"));
+        assertNotNull(listener);
+        calledTimes.incrementAndGet();
+    }
+
+    private ClusterChangedEvent createClusterChangedEvent(Map<String, Integer> existingTemplates, DiscoveryNodes nodes) {
+        return createClusterChangedEvent(existingTemplates, Collections.emptyMap(), Collections.emptyMap(), nodes);
+    }
+
+    private ClusterChangedEvent createClusterChangedEvent(
+        Map<String, Integer> existingTemplates,
+        Map<String, LifecyclePolicy> existingPolicies,
+        Map<String, Integer> existingIngestPipelines,
+        DiscoveryNodes nodes
+    ) {
+        ClusterState cs = createClusterState(Settings.EMPTY, existingTemplates, existingPolicies, existingIngestPipelines, nodes);
+        ClusterChangedEvent realEvent = new ClusterChangedEvent(
+            "created-from-test",
+            cs,
+            ClusterState.builder(new ClusterName("test")).build()
+        );
+        ClusterChangedEvent event = spy(realEvent);
+        when(event.localNodeMaster()).thenReturn(nodes.isLocalNodeElectedMaster());
+
+        return event;
+    }
+
+    private ClusterState createClusterState(
+        Settings nodeSettings,
+        Map<String, Integer> existingComponentTemplates,
+        Map<String, LifecyclePolicy> existingPolicies,
+        Map<String, Integer> existingIngestPipelines,
+        DiscoveryNodes nodes
+    ) {
+        Map<String, ComponentTemplate> componentTemplates = new HashMap<>();
+        for (Map.Entry<String, Integer> template : existingComponentTemplates.entrySet()) {
+            ComponentTemplate mockTemplate = mock(ComponentTemplate.class);
+            when(mockTemplate.version()).thenReturn(template.getValue() == null ? null : (long) template.getValue());
+            componentTemplates.put(template.getKey(), mockTemplate);
+        }
+
+        Map<String, LifecyclePolicyMetadata> existingILMMeta = existingPolicies.entrySet()
+            .stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> new LifecyclePolicyMetadata(e.getValue(), Collections.emptyMap(), 1, 1)));
+        IndexLifecycleMetadata ilmMeta = new IndexLifecycleMetadata(existingILMMeta, OperationMode.RUNNING);
+
+        Map<String, PipelineConfiguration> ingestPipelines = new HashMap<>();
+        for (Map.Entry<String, Integer> pipelineEntry : existingIngestPipelines.entrySet()) {
+            // we cannot mock PipelineConfiguration as it is a final class
+            ingestPipelines.put(
+                pipelineEntry.getKey(),
+                new PipelineConfiguration(
+                    pipelineEntry.getKey(),
+                    new BytesArray(Strings.format("{\"version\": %d}", pipelineEntry.getValue())),
+                    XContentType.JSON
+                )
+            );
+        }
+        IngestMetadata ingestMetadata = new IngestMetadata(ingestPipelines);
+
+        return ClusterState.builder(new ClusterName("test"))
+            .metadata(
+                Metadata.builder()
+                    .componentTemplates(componentTemplates)
+                    .transientSettings(nodeSettings)
+                    .putCustom(IndexLifecycleMetadata.TYPE, ilmMeta)
+                    .putCustom(IngestMetadata.TYPE, ingestMetadata)
+                    .build()
+            )
+            .blocks(new ClusterBlocks.Builder().build())
+            .nodes(nodes)
+            .build();
+    }
+
+    // -------------
+
+    /**
+     * A client that delegates to a verifying function for action/request/listener
+     */
+    public static class VerifyingClient extends NoOpClient {
+
+        private TriFunction<ActionType<?>, ActionRequest, ActionListener<?>, ActionResponse> verifier = (a, r, l) -> {
+            fail("verifier not set");
+            return null;
+        };
+
+        VerifyingClient(ThreadPool threadPool) {
+            super(threadPool);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
+            ActionType<Response> action,
+            Request request,
+            ActionListener<Response> listener
+        ) {
+            try {
+                listener.onResponse((Response) verifier.apply(action, request, listener));
+            } catch (Exception e) {
+                listener.onFailure(e);
+            }
+        }
+
+        public VerifyingClient setVerifier(TriFunction<ActionType<?>, ActionRequest, ActionListener<?>, ActionResponse> verifier) {
+            this.verifier = verifier;
+            return this;
+        }
+    }
+}

+ 93 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/TestRegistryWithCustomPlugin.java

@@ -0,0 +1,93 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.template;
+
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.metadata.ComponentTemplate;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xcontent.NamedXContentRegistry;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xcontent.json.JsonXContent;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+class TestRegistryWithCustomPlugin extends IndexTemplateRegistry {
+
+    public static final int REGISTRY_VERSION = 3;
+    public static final String TEMPLATE_VERSION_VARIABLE = "xpack.custom_plugin.template.version";
+
+    TestRegistryWithCustomPlugin(
+        Settings nodeSettings,
+        ClusterService clusterService,
+        ThreadPool threadPool,
+        Client client,
+        NamedXContentRegistry xContentRegistry
+    ) {
+        super(nodeSettings, clusterService, threadPool, client, xContentRegistry);
+    }
+
+    @Override
+    protected Map<String, ComponentTemplate> getComponentTemplateConfigs() {
+        String settingsConfigName = "custom-plugin-settings";
+        IndexTemplateConfig config = new IndexTemplateConfig(
+            settingsConfigName,
+            "/org/elasticsearch/xpack/core/template/custom-plugin-settings.json",
+            REGISTRY_VERSION,
+            TEMPLATE_VERSION_VARIABLE
+        );
+        ComponentTemplate componentTemplate = null;
+        try {
+            componentTemplate = ComponentTemplate.parse(
+                JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, config.loadBytes())
+            );
+        } catch (IOException e) {
+            throw new AssertionError(e);
+        }
+        return Map.of(settingsConfigName, componentTemplate);
+    }
+
+    @Override
+    protected Map<String, ComposableIndexTemplate> getComposableTemplateConfigs() {
+        return IndexTemplateRegistry.parseComposableTemplates(
+            new IndexTemplateConfig(
+                "custom-plugin-template",
+                "/org/elasticsearch/xpack/core/template/custom-plugin-template.json",
+                REGISTRY_VERSION,
+                TEMPLATE_VERSION_VARIABLE
+            )
+        );
+    }
+
+    @Override
+    protected List<IngestPipelineConfig> getIngestPipelines() {
+        return List.of(
+            new IngestPipelineConfig(
+                "custom-plugin-default_pipeline",
+                "/org/elasticsearch/xpack/core/template/custom-plugin-default_pipeline.json",
+                REGISTRY_VERSION,
+                TEMPLATE_VERSION_VARIABLE
+            ),
+            new IngestPipelineConfig(
+                "custom-plugin-final_pipeline",
+                "/org/elasticsearch/xpack/core/template/custom-plugin-final_pipeline.json",
+                REGISTRY_VERSION,
+                TEMPLATE_VERSION_VARIABLE
+            )
+        );
+    }
+
+    @Override
+    protected String getOrigin() {
+        return "test";
+    }
+}

+ 15 - 0
x-pack/plugin/core/src/test/resources/org/elasticsearch/xpack/core/template/custom-plugin-default_pipeline.json

@@ -0,0 +1,15 @@
+{
+  "description": "my custom default pipeline",
+  "processors": [
+    {
+      "set": {
+        "field": "test_field",
+        "copy_from": "other_field"
+      }
+    }
+  ],
+  "_meta": {
+    "managed": true
+  },
+  "version": ${xpack.custom_plugin.template.version}
+}

+ 15 - 0
x-pack/plugin/core/src/test/resources/org/elasticsearch/xpack/core/template/custom-plugin-final_pipeline.json

@@ -0,0 +1,15 @@
+{
+  "description": "my custom final pipeline",
+  "processors": [
+    {
+      "set": {
+        "field": "test_field",
+        "copy_from": "other_field"
+      }
+    }
+  ],
+  "_meta": {
+    "managed": true
+  },
+  "version": ${xpack.custom_plugin.template.version}
+}

+ 17 - 0
x-pack/plugin/core/src/test/resources/org/elasticsearch/xpack/core/template/custom-plugin-settings.json

@@ -0,0 +1,17 @@
+{
+  "template": {
+    "settings": {
+      "index": {
+        "number_of_shards": 1,
+        "number_of_replicas": 0,
+        "default_pipeline": "custom-plugin-default_pipeline",
+        "final_pipeline": "custom-plugin-final_pipeline"
+      }
+    }
+  },
+  "_meta": {
+    "description": "settings for my application logs",
+    "managed": true
+  },
+  "version": ${xpack.custom_plugin.template.version}
+}

+ 16 - 0
x-pack/plugin/core/src/test/resources/org/elasticsearch/xpack/core/template/custom-plugin-template.json

@@ -0,0 +1,16 @@
+{
+  "index_patterns": ["logs-my_app-*"],
+  "priority": 200,
+  "data_stream": {},
+  "composed_of": [
+    "custom-plugin-settings",
+    "syslog@custom"
+  ],
+  "ignore_missing_component_templates": ["syslog@custom"],
+  "allow_auto_create": true,
+  "_meta": {
+    "description": "integration for my application logs",
+    "managed": true
+  },
+  "version": ${xpack.custom_plugin.template.version}
+}

+ 2 - 2
x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/java/org/elasticsearch/xpack/entsearch/EnterpriseSearchRestIT.java

@@ -9,14 +9,14 @@ package org.elasticsearch.xpack.entsearch;
 
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 
+import org.apache.lucene.tests.util.LuceneTestCase;
 import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
 import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
 
-import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
-
+@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/95917")
 public class EnterpriseSearchRestIT extends ESClientYamlSuiteTestCase {
 
     public EnterpriseSearchRestIT(final ClientYamlTestCandidate testCandidate) {

+ 60 - 5
x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsTemplateRegistryTests.java

@@ -26,7 +26,11 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.node.TestDiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.TriFunction;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Strings;
+import org.elasticsearch.ingest.IngestMetadata;
+import org.elasticsearch.ingest.PipelineConfiguration;
 import org.elasticsearch.test.ClusterServiceUtils;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.client.NoOpClient;
@@ -120,7 +124,13 @@ public class AnalyticsTemplateRegistryTests extends ESTestCase {
         DiscoveryNode node = TestDiscoveryNode.create("node");
         DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
 
-        ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), Collections.emptyMap(), nodes);
+        ClusterChangedEvent event = createClusterChangedEvent(
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.singletonMap("behavioral_analytics-events-final_pipeline", AnalyticsTemplateRegistry.REGISTRY_VERSION),
+            Collections.emptyMap(),
+            nodes
+        );
 
         AtomicInteger calledTimes = new AtomicInteger(0);
         client.setVerifier((action, request, listener) -> verifyComponentTemplateInstalled(calledTimes, action, request, listener));
@@ -193,7 +203,13 @@ public class AnalyticsTemplateRegistryTests extends ESTestCase {
             return null;
         });
 
-        ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), Collections.emptyMap(), policyMap, nodes);
+        ClusterChangedEvent event = createClusterChangedEvent(
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            policyMap,
+            nodes
+        );
         registry.clusterChanged(event);
     }
 
@@ -238,7 +254,13 @@ public class AnalyticsTemplateRegistryTests extends ESTestCase {
         ) {
             LifecyclePolicy different = LifecyclePolicy.parse(parser, policies.get(0).getName());
             policyMap.put(policies.get(0).getName(), different);
-            ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), Collections.emptyMap(), policyMap, nodes);
+            ClusterChangedEvent event = createClusterChangedEvent(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                policyMap,
+                nodes
+            );
             registry.clusterChanged(event);
         }
     }
@@ -253,6 +275,8 @@ public class AnalyticsTemplateRegistryTests extends ESTestCase {
                 AnalyticsTemplateRegistry.EVENT_DATA_STREAM_SETTINGS_COMPONENT_NAME,
                 AnalyticsTemplateRegistry.REGISTRY_VERSION - 1
             ),
+            Collections.singletonMap("behavioral_analytics-events-final_pipeline", AnalyticsTemplateRegistry.REGISTRY_VERSION),
+            Collections.emptyMap(),
             nodes
         );
         AtomicInteger calledTimes = new AtomicInteger(0);
@@ -268,6 +292,8 @@ public class AnalyticsTemplateRegistryTests extends ESTestCase {
         ClusterChangedEvent event = createClusterChangedEvent(
             Collections.emptyMap(),
             Collections.singletonMap(AnalyticsTemplateRegistry.EVENT_DATA_STREAM_MAPPINGS_COMPONENT_NAME, null),
+            Collections.singletonMap("behavioral_analytics-events-final_pipeline", AnalyticsTemplateRegistry.REGISTRY_VERSION),
+            Collections.emptyMap(),
             nodes
         );
         AtomicInteger calledTimes = new AtomicInteger(0);
@@ -428,16 +454,29 @@ public class AnalyticsTemplateRegistryTests extends ESTestCase {
         Map<String, Integer> existingComponentTemplates,
         DiscoveryNodes nodes
     ) {
-        return createClusterChangedEvent(existingComposableTemplates, existingComponentTemplates, Collections.emptyMap(), nodes);
+        return createClusterChangedEvent(
+            existingComposableTemplates,
+            existingComponentTemplates,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            nodes
+        );
     }
 
     private ClusterChangedEvent createClusterChangedEvent(
         Map<String, Integer> existingComposableTemplates,
         Map<String, Integer> existingComponentTemplates,
+        Map<String, Integer> existingIngestPipelines,
         Map<String, LifecyclePolicy> existingPolicies,
         DiscoveryNodes nodes
     ) {
-        ClusterState cs = createClusterState(existingComposableTemplates, existingComponentTemplates, existingPolicies, nodes);
+        ClusterState cs = createClusterState(
+            existingComposableTemplates,
+            existingComponentTemplates,
+            existingIngestPipelines,
+            existingPolicies,
+            nodes
+        );
         ClusterChangedEvent realEvent = new ClusterChangedEvent(
             "created-from-test",
             cs,
@@ -452,6 +491,7 @@ public class AnalyticsTemplateRegistryTests extends ESTestCase {
     private ClusterState createClusterState(
         Map<String, Integer> existingComposableTemplates,
         Map<String, Integer> existingComponentTemplates,
+        Map<String, Integer> existingIngestPipelines,
         Map<String, LifecyclePolicy> existingPolicies,
         DiscoveryNodes nodes
     ) {
@@ -469,6 +509,20 @@ public class AnalyticsTemplateRegistryTests extends ESTestCase {
             componentTemplates.put(template.getKey(), mockTemplate);
         }
 
+        Map<String, PipelineConfiguration> ingestPipelines = new HashMap<>();
+        for (Map.Entry<String, Integer> pipelineEntry : existingIngestPipelines.entrySet()) {
+            // we cannot mock PipelineConfiguration as it is a final class
+            ingestPipelines.put(
+                pipelineEntry.getKey(),
+                new PipelineConfiguration(
+                    pipelineEntry.getKey(),
+                    new BytesArray(Strings.format("{\"version\": %d}", pipelineEntry.getValue())),
+                    XContentType.JSON
+                )
+            );
+        }
+        IngestMetadata ingestMetadata = new IngestMetadata(ingestPipelines);
+
         Map<String, LifecyclePolicyMetadata> existingILMMeta = existingPolicies.entrySet()
             .stream()
             .collect(Collectors.toMap(Map.Entry::getKey, e -> new LifecyclePolicyMetadata(e.getValue(), Collections.emptyMap(), 1, 1)));
@@ -480,6 +534,7 @@ public class AnalyticsTemplateRegistryTests extends ESTestCase {
                     .indexTemplates(composableTemplates)
                     .componentTemplates(componentTemplates)
                     .transientSettings(Settings.EMPTY)
+                    .putCustom(IngestMetadata.TYPE, ingestMetadata)
                     .putCustom(IndexLifecycleMetadata.TYPE, ilmMeta)
                     .build()
             )

+ 1 - 1
x-pack/plugin/stack/src/test/resources/non-required-template.json

@@ -9,7 +9,7 @@
   "ignore_missing_component_templates": ["syslog@custom"],
   "allow_auto_create": true,
   "_meta": {
-    "description": "default logs template installed by x-pack",
+    "description": "syslog test settings",
     "managed": true
   },
   "version": ${xpack.stack.template.version}