Browse Source

[ML] Switch ML internal index templates to composable templates (#73232)

Legacy index templates are deprecated but ML was still using
them for its hidden indices.

This PR switches the legacy ML index templates to use the new
composable index template framework.

The composable index templates get installed once the master
node is on a version that understands them.  For templates
that need to be up-to-date in mixed version clusters where the
master might still be on a version that doesn't understand
composable index templates we still ship the legacy template
too, and install this if required in the mixed version cluster.
(The notifications index template falls into this category.)
This makes a couple of places in the code a little messy, as
the new style template definitions don't contain a dummy _doc
level (where the type used to be), but the legacy template
definitions do - hopefully we can tidy this up in master once
8.0 is released.

There is one more change of note in this PR that is not
strictly related to switching to composable templates, but
which was shown up during the testing.  We used to wait for
all templates to be installed by the master node before running
tests in mixed version clusters.  I do not believe we should
have been doing this, as other upgrade orchestration systems,
e.g. Cloud, will not be doing this.  Our production code needs
to install templates and/or mappings before any operation that
requires them if there's a chance that the elected master won't
have done this in time.

Fixes #65437
David Roberts 4 years ago
parent
commit
0216cf065b
33 changed files with 1267 additions and 863 deletions
  1. 34 9
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java
  2. 4 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlStatsIndex.java
  3. 4 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java
  4. 11 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/notifications/NotificationsIndex.java
  5. 62 22
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java
  6. 46 20
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/TemplateUtils.java
  7. 479 481
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/results_index_mappings.json
  8. 19 13
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/results_index_template.json
  9. 16 12
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/state_index_template.json
  10. 17 0
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/notifications_index_legacy_template.json
  11. 32 0
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/notifications_index_mappings.json
  12. 13 40
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/notifications_index_template.json
  13. 137 139
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/stats_index_mappings.json
  14. 16 10
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/stats_index_template.json
  15. 38 10
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java
  16. 124 32
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java
  17. 1 1
      x-pack/plugin/identity-provider/src/main/java/org/elasticsearch/xpack/idp/saml/sp/SamlServiceProviderIndex.java
  18. 2 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  19. 31 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java
  20. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java
  21. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java
  22. 2 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java
  23. 2 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java
  24. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java
  25. 2 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java
  26. 17 11
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java
  27. 19 16
      x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/AbstractXPackRestTest.java
  28. 2 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java
  29. 3 1
      x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlConfigIndexMappingsFullClusterRestartIT.java
  30. 3 1
      x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java
  31. 12 5
      x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java
  32. 84 19
      x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/IndexMappingTemplateAsserter.java
  33. 32 7
      x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java

+ 34 - 9
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java

@@ -8,17 +8,24 @@ package org.elasticsearch.xpack.core.common.notifications;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.OriginSettingClient;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.DeprecationHandler;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
 import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
 
@@ -39,12 +46,15 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {
 
     private static final Logger logger = LogManager.getLogger(AbstractAuditor.class);
     static final int MAX_BUFFER_SIZE = 1000;
+    static final TimeValue MASTER_TIMEOUT = TimeValue.timeValueMinutes(1);
 
     private final OriginSettingClient client;
     private final String nodeName;
     private final String auditIndex;
     private final String templateName;
-    private final Supplier<PutIndexTemplateRequest> templateSupplier;
+    private final Version versionComposableTemplateExpected;
+    private final Supplier<PutIndexTemplateRequest> legacyTemplateSupplier;
+    private final Supplier<PutComposableIndexTemplateAction.Request> templateSupplier;
     private final AbstractAuditMessageFactory<T> messageFactory;
     private final AtomicBoolean hasLatestTemplate;
 
@@ -52,30 +62,45 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {
     private final ClusterService clusterService;
     private final AtomicBoolean putTemplateInProgress;
 
-
     protected AbstractAuditor(OriginSettingClient client,
                               String auditIndex,
+                              Version versionComposableTemplateExpected,
+                              IndexTemplateConfig legacyTemplateConfig,
                               IndexTemplateConfig templateConfig,
                               String nodeName,
                               AbstractAuditMessageFactory<T> messageFactory,
                               ClusterService clusterService) {
 
-        this(client, auditIndex, templateConfig.getTemplateName(),
-            () -> new PutIndexTemplateRequest(templateConfig.getTemplateName()).source(templateConfig.loadBytes(), XContentType.JSON),
+        this(client, auditIndex, templateConfig.getTemplateName(), versionComposableTemplateExpected,
+            () -> new PutIndexTemplateRequest(legacyTemplateConfig.getTemplateName())
+                .source(legacyTemplateConfig.loadBytes(), XContentType.JSON).masterNodeTimeout(MASTER_TIMEOUT),
+            () -> {
+                try {
+                    return new PutComposableIndexTemplateAction.Request(templateConfig.getTemplateName())
+                        .indexTemplate(ComposableIndexTemplate.parse(JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
+                            DeprecationHandler.THROW_UNSUPPORTED_OPERATION, templateConfig.loadBytes())))
+                        .masterNodeTimeout(MASTER_TIMEOUT);
+                } catch (IOException e) {
+                    throw new ElasticsearchParseException("unable to parse composable template " + templateConfig.getTemplateName(), e);
+                }
+            },
             nodeName, messageFactory, clusterService);
     }
 
-
     protected AbstractAuditor(OriginSettingClient client,
                               String auditIndex,
                               String templateName,
-                              Supplier<PutIndexTemplateRequest> templateSupplier,
+                              Version versionComposableTemplateExpected,
+                              Supplier<PutIndexTemplateRequest> legacyTemplateSupplier,
+                              Supplier<PutComposableIndexTemplateAction.Request> templateSupplier,
                               String nodeName,
                               AbstractAuditMessageFactory<T> messageFactory,
                               ClusterService clusterService) {
         this.client = Objects.requireNonNull(client);
         this.auditIndex = Objects.requireNonNull(auditIndex);
         this.templateName = Objects.requireNonNull(templateName);
+        this.versionComposableTemplateExpected = versionComposableTemplateExpected;
+        this.legacyTemplateSupplier = Objects.requireNonNull(legacyTemplateSupplier);
         this.templateSupplier = Objects.requireNonNull(templateSupplier);
         this.messageFactory = Objects.requireNonNull(messageFactory);
         this.clusterService = Objects.requireNonNull(clusterService);
@@ -111,7 +136,7 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {
             return;
         }
 
-        if (MlIndexAndAlias.hasIndexTemplate(clusterService.state(), templateName)) {
+        if (MlIndexAndAlias.hasIndexTemplate(clusterService.state(), templateName, templateName, versionComposableTemplateExpected)) {
             synchronized (this) {
                 // synchronized so nothing can be added to backlog while this value changes
                 hasLatestTemplate.set(true);
@@ -152,8 +177,8 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {
 
                 // stop multiple invocations
                 if (putTemplateInProgress.compareAndSet(false, true)) {
-                    MlIndexAndAlias.installIndexTemplateIfRequired(clusterService.state(), client, templateSupplier.get(),
-                        putTemplateListener);
+                    MlIndexAndAlias.installIndexTemplateIfRequired(clusterService.state(), client, versionComposableTemplateExpected,
+                        legacyTemplateSupplier.get(), templateSupplier.get(), putTemplateListener);
                 }
                 return;
             }

+ 4 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlStatsIndex.java

@@ -26,6 +26,10 @@ public class MlStatsIndex {
 
     private MlStatsIndex() {}
 
+    public static String wrappedMapping() {
+        return "{\n\"_doc\" : " + mapping() + "\n}";
+    }
+
     public static String mapping() {
         return TemplateUtils.loadTemplate("/org/elasticsearch/xpack/core/ml/stats_index_mappings.json",
             Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE);

+ 4 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java

@@ -83,6 +83,10 @@ public final class AnomalyDetectorsIndex {
             finalListener);
     }
 
+    public static String wrappedResultsMapping() {
+        return "{\n\"_doc\" : " + resultsMapping() + "\n}";
+    }
+
     public static String resultsMapping() {
         return TemplateUtils.loadTemplate(RESOURCE_PATH + "results_index_mappings.json",
             Version.CURRENT.toString(), RESULTS_MAPPINGS_VERSION_VARIABLE);

+ 11 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/notifications/NotificationsIndex.java

@@ -6,9 +6,20 @@
  */
 package org.elasticsearch.xpack.core.ml.notifications;
 
+import org.elasticsearch.Version;
+import org.elasticsearch.xpack.core.template.TemplateUtils;
+
 public final class NotificationsIndex {
 
     public static final String NOTIFICATIONS_INDEX = ".ml-notifications-000001";
 
+    private static final String RESOURCE_PATH = "/org/elasticsearch/xpack/core/ml/";
+    private static final String MAPPINGS_VERSION_VARIABLE = "xpack.ml.version";
+
     private NotificationsIndex() {}
+
+    public static String mapping() {
+        return TemplateUtils.loadTemplate(RESOURCE_PATH + "notifications_index_mappings.json",
+            Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE);
+    }
 }

+ 62 - 22
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java

@@ -9,7 +9,9 @@ package org.elasticsearch.xpack.core.ml.utils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.ResourceAlreadyExistsException;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@@ -19,20 +21,26 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.DeprecationHandler;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.indices.SystemIndexDescriptor;
 import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Optional;
@@ -303,25 +311,40 @@ public final class MlIndexAndAlias {
     public static void installIndexTemplateIfRequired(
         ClusterState clusterState,
         Client client,
+        Version versionComposableTemplateExpected,
+        IndexTemplateConfig legacyTemplateConfig,
         IndexTemplateConfig templateConfig,
+        TimeValue masterTimeout,
         ActionListener<Boolean> listener
     ) {
+        String legacyTemplateName = legacyTemplateConfig.getTemplateName();
         String templateName = templateConfig.getTemplateName();
 
         // The check for existence of the template is against the cluster state, so very cheap
-        if (hasIndexTemplate(clusterState, templateName)) {
+        if (hasIndexTemplate(clusterState, legacyTemplateName, templateName, versionComposableTemplateExpected)) {
             listener.onResponse(true);
             return;
         }
 
-        PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName)
-            .source(templateConfig.loadBytes(), XContentType.JSON);
+        PutIndexTemplateRequest legacyRequest = new PutIndexTemplateRequest(legacyTemplateName)
+            .source(legacyTemplateConfig.loadBytes(), XContentType.JSON).masterNodeTimeout(masterTimeout);
+
+        PutComposableIndexTemplateAction.Request request;
+        try {
+            request = new PutComposableIndexTemplateAction.Request(templateConfig.getTemplateName())
+                .indexTemplate(ComposableIndexTemplate.parse(JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
+                    DeprecationHandler.THROW_UNSUPPORTED_OPERATION, templateConfig.loadBytes())))
+                .masterNodeTimeout(masterTimeout);
+        } catch (IOException e) {
+            throw new ElasticsearchParseException("unable to parse composable template " + templateConfig.getTemplateName(), e);
+        }
 
-        installIndexTemplateIfRequired(clusterState, client, request, listener);
+        installIndexTemplateIfRequired(clusterState, client, versionComposableTemplateExpected, legacyRequest, request, listener);
     }
 
     /**
-     * See {@link #installIndexTemplateIfRequired(ClusterState, Client, IndexTemplateConfig, ActionListener)}.
+     * See {@link #installIndexTemplateIfRequired(ClusterState, Client, Version, IndexTemplateConfig, IndexTemplateConfig, TimeValue,
+     * ActionListener)}.
      *
      * Overload takes a {@code PutIndexTemplateRequest} instead of {@code IndexTemplateConfig}
      *
@@ -333,34 +356,51 @@ public final class MlIndexAndAlias {
     public static void installIndexTemplateIfRequired(
         ClusterState clusterState,
         Client client,
-        PutIndexTemplateRequest templateRequest,
+        Version versionComposableTemplateExpected,
+        PutIndexTemplateRequest legacyTemplateRequest,
+        PutComposableIndexTemplateAction.Request templateRequest,
         ActionListener<Boolean> listener
     ) {
-        String templateName = templateRequest.name();
-
         // The check for existence of the template is against the cluster state, so very cheap
-        if (hasIndexTemplate(clusterState, templateRequest.name())) {
+        if (hasIndexTemplate(clusterState, legacyTemplateRequest.name(), templateRequest.name(), versionComposableTemplateExpected)) {
             listener.onResponse(true);
             return;
         }
 
-        templateRequest.masterNodeTimeout(TimeValue.timeValueMinutes(1));
+        if (versionComposableTemplateExpected != null &&
+            clusterState.nodes().getMinNodeVersion().onOrAfter(versionComposableTemplateExpected)) {
+            ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(
+                response -> {
+                    if (response.isAcknowledged() == false) {
+                        logger.warn("error adding template [{}], request was not acknowledged", templateRequest.name());
+                    }
+                    listener.onResponse(response.isAcknowledged());
+                },
+                listener::onFailure);
 
-        ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(
-            response ->  {
-                if (response.isAcknowledged() == false) {
-                    logger.warn("error adding legacy template [{}], request was not acknowledged", templateName);
-                }
-                listener.onResponse(response.isAcknowledged());
-            },
-            listener::onFailure);
+            executeAsyncWithOrigin(client, ML_ORIGIN, PutComposableIndexTemplateAction.INSTANCE, templateRequest, innerListener);
+        } else {
+            ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(
+                response -> {
+                    if (response.isAcknowledged() == false) {
+                        logger.warn("error adding legacy template [{}], request was not acknowledged", legacyTemplateRequest.name());
+                    }
+                    listener.onResponse(response.isAcknowledged());
+                },
+                listener::onFailure);
 
-        executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, templateRequest, innerListener,
-            client.admin().indices()::putTemplate);
+            executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, legacyTemplateRequest, innerListener,
+                client.admin().indices()::putTemplate);
+        }
     }
 
-    public static boolean hasIndexTemplate(ClusterState state, String templateName) {
-        return state.getMetadata().getTemplates().containsKey(templateName);
+    public static boolean hasIndexTemplate(ClusterState state, String legacyTemplateName,
+                                           String templateName, Version versionComposableTemplateExpected) {
+        if (versionComposableTemplateExpected != null && state.nodes().getMinNodeVersion().onOrAfter(versionComposableTemplateExpected)) {
+            return state.getMetadata().templatesV2().containsKey(templateName);
+        } else {
+            return state.getMetadata().getTemplates().containsKey(legacyTemplateName);
+        }
     }
 
     public static boolean hasIndex(ClusterState state, String index) {

+ 46 - 20
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/TemplateUtils.java

@@ -11,9 +11,9 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
@@ -42,8 +42,8 @@ public class TemplateUtils {
     /**
      * Loads a JSON template as a resource and puts it into the provided map
      */
-    public static void loadTemplateIntoMap(String resource, Map<String, IndexTemplateMetadata> map, String templateName, String version,
-                                           String versionProperty, Logger logger) {
+    public static void loadLegacyTemplateIntoMap(String resource, Map<String, IndexTemplateMetadata> map, String templateName,
+                                                 String version, String versionProperty, Logger logger) {
         final String template = loadTemplate(resource, version, versionProperty);
         try (XContentParser parser = XContentFactory.xContent(XContentType.JSON)
                 .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, template)) {
@@ -76,7 +76,7 @@ public class TemplateUtils {
     }
 
     /**
-     * Loads a resource from the classpath and returns it as a {@link BytesReference}
+     * Loads a resource from the classpath and returns it as a {@link String}
      */
     public static String load(String name) throws IOException {
         return Streams.readFully(TemplateUtils.class.getResourceAsStream(name)).utf8ToString();
@@ -109,7 +109,7 @@ public class TemplateUtils {
     }
 
     /**
-     * Replaces all occurences of given variable with the value
+     * Replaces all occurrences of given variable with the value
      */
     public static String replaceVariable(String input, String variable, String value) {
         return Pattern.compile("${" + variable + "}", Pattern.LITERAL)
@@ -121,14 +121,26 @@ public class TemplateUtils {
      * Checks if a versioned template exists, and if it exists checks if the version is greater than or equal to the current version.
      * @param templateName Name of the index template
      * @param state Cluster state
+     * @param versionComposableTemplateExpected In which version of Elasticsearch did this template switch to being a composable template?
+     *                                          <code>null</code> means the template hasn't been switched yet.
      */
-    public static boolean checkTemplateExistsAndVersionIsGTECurrentVersion(String templateName, ClusterState state) {
-        IndexTemplateMetadata templateMetadata = state.metadata().templates().get(templateName);
-        if (templateMetadata == null) {
-            return false;
-        }
+    public static boolean checkTemplateExistsAndVersionIsGTECurrentVersion(String templateName, ClusterState state,
+                                                                           Version versionComposableTemplateExpected) {
+        if (versionComposableTemplateExpected != null && state.nodes().getMinNodeVersion().onOrAfter(versionComposableTemplateExpected)) {
+            ComposableIndexTemplate templateMetadata = state.metadata().templatesV2().get(templateName);
+            if (templateMetadata == null) {
+                return false;
+            }
 
-        return templateMetadata.version() != null && templateMetadata.version() >= Version.CURRENT.id;
+            return templateMetadata.version() != null && templateMetadata.version() >= Version.CURRENT.id;
+        } else {
+            IndexTemplateMetadata templateMetadata = state.metadata().templates().get(templateName);
+            if (templateMetadata == null) {
+                return false;
+            }
+
+            return templateMetadata.version() != null && templateMetadata.version() >= Version.CURRENT.id;
+        }
     }
 
     /**
@@ -137,12 +149,14 @@ public class TemplateUtils {
      * @param templateName Name of the index template
      * @param state Cluster state
      * @param logger Logger
+     * @param versionComposableTemplateExpected In which version of Elasticsearch did this template switch to being a composable template?
+     *                                          <code>null</code> means the template hasn't been switched yet.
      */
     public static boolean checkTemplateExistsAndIsUpToDate(
-        String templateName, String versionKey, ClusterState state, Logger logger) {
+        String templateName, String versionKey, ClusterState state, Logger logger, Version versionComposableTemplateExpected) {
 
         return checkTemplateExistsAndVersionMatches(templateName, versionKey, state, logger,
-            Version.CURRENT::equals);
+            Version.CURRENT::equals, versionComposableTemplateExpected);
     }
 
     /**
@@ -152,15 +166,28 @@ public class TemplateUtils {
      * @param state Cluster state
      * @param logger Logger
      * @param predicate Predicate to execute on version check
+     * @param versionComposableTemplateExpected In which version of Elasticsearch did this template switch to being a composable template?
+     *                                          <code>null</code> means the template hasn't been switched yet.
      */
     public static boolean checkTemplateExistsAndVersionMatches(
-        String templateName, String versionKey, ClusterState state, Logger logger, Predicate<Version> predicate) {
-
-        IndexTemplateMetadata templateMeta = state.metadata().templates().get(templateName);
-        if (templateMeta == null) {
-            return false;
+        String templateName, String versionKey, ClusterState state, Logger logger, Predicate<Version> predicate,
+        Version versionComposableTemplateExpected) {
+
+        CompressedXContent mappings;
+        if (versionComposableTemplateExpected != null && state.nodes().getMinNodeVersion().onOrAfter(versionComposableTemplateExpected)) {
+            ComposableIndexTemplate templateMeta = state.metadata().templatesV2().get(templateName);
+            if (templateMeta == null) {
+                return false;
+            }
+            mappings = templateMeta.template().mappings();
+        } else {
+            IndexTemplateMetadata templateMeta = state.metadata().templates().get(templateName);
+            if (templateMeta == null) {
+                return false;
+            }
+            mappings = templateMeta.getMappings();
         }
-        CompressedXContent mappings = templateMeta.getMappings();
+
         // check all mappings contain correct version in _meta
         // we have to parse the source here which is annoying
         if (mappings != null) {
@@ -194,5 +221,4 @@ public class TemplateUtils {
         }
         return predicate.test(Version.fromString((String) meta.get(versionKey)));
     }
-
 }

+ 479 - 481
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/results_index_mappings.json

@@ -1,496 +1,494 @@
 {
-  "_doc" : {
-    "_meta" : {
-      "version" : "${xpack.ml.version}"
-    },
-    "dynamic_templates" : [
-      {
-        "strings_as_keywords" : {
-          "match" : "*",
-          "mapping" : {
-            "type" : "keyword"
-          }
+  "_meta" : {
+    "version" : "${xpack.ml.version}"
+  },
+  "dynamic_templates" : [
+    {
+      "strings_as_keywords" : {
+        "match" : "*",
+        "mapping" : {
+          "type" : "keyword"
         }
       }
-    ],
-    "properties" : {
-      "actual" : {
-        "type" : "double"
-      },
-      "all_field_values" : {
-        "type" : "text",
-        "analyzer" : "whitespace"
-      },
-      "anomaly_score" : {
-        "type" : "double"
-      },
-      "assignment_memory_basis" : {
-        "type" : "keyword"
-      },
-      "average_bucket_processing_time_ms" : {
-        "type" : "double"
-      },
-      "bucket_allocation_failures_count" : {
-        "type" : "long"
-      },
-      "bucket_count" : {
-        "type" : "long"
-      },
-      "bucket_influencers" : {
-        "type" : "nested",
-        "properties" : {
-          "anomaly_score" : {
-            "type" : "double"
-          },
-          "bucket_span" : {
-            "type" : "long"
-          },
-          "influencer_field_name" : {
-            "type" : "keyword"
-          },
-          "initial_anomaly_score" : {
-            "type" : "double"
-          },
-          "is_interim" : {
-            "type" : "boolean"
-          },
-          "job_id" : {
-            "type" : "keyword"
-          },
-          "probability" : {
-            "type" : "double"
-          },
-          "raw_anomaly_score" : {
-            "type" : "double"
-          },
-          "result_type" : {
-            "type" : "keyword"
-          },
-          "timestamp" : {
-            "type" : "date"
-          }
+    }
+  ],
+  "properties" : {
+    "actual" : {
+      "type" : "double"
+    },
+    "all_field_values" : {
+      "type" : "text",
+      "analyzer" : "whitespace"
+    },
+    "anomaly_score" : {
+      "type" : "double"
+    },
+    "assignment_memory_basis" : {
+      "type" : "keyword"
+    },
+    "average_bucket_processing_time_ms" : {
+      "type" : "double"
+    },
+    "bucket_allocation_failures_count" : {
+      "type" : "long"
+    },
+    "bucket_count" : {
+      "type" : "long"
+    },
+    "bucket_influencers" : {
+      "type" : "nested",
+      "properties" : {
+        "anomaly_score" : {
+          "type" : "double"
+        },
+        "bucket_span" : {
+          "type" : "long"
+        },
+        "influencer_field_name" : {
+          "type" : "keyword"
+        },
+        "initial_anomaly_score" : {
+          "type" : "double"
+        },
+        "is_interim" : {
+          "type" : "boolean"
+        },
+        "job_id" : {
+          "type" : "keyword"
+        },
+        "probability" : {
+          "type" : "double"
+        },
+        "raw_anomaly_score" : {
+          "type" : "double"
+        },
+        "result_type" : {
+          "type" : "keyword"
+        },
+        "timestamp" : {
+          "type" : "date"
         }
-      },
-      "bucket_span" : {
-        "type" : "long"
-      },
-      "by_field_name" : {
-        "type" : "keyword"
-      },
-      "by_field_value" : {
-        "type" : "keyword",
-        "copy_to" : [
-          "all_field_values"
-        ]
-      },
-      "category_id" : {
-        "type" : "long"
-      },
-      "causes" : {
-        "type" : "nested",
-        "properties" : {
-          "actual" : {
-            "type" : "double"
-          },
-          "by_field_name" : {
-            "type" : "keyword"
-          },
-          "by_field_value" : {
-            "type" : "keyword",
-            "copy_to" : [
-              "all_field_values"
-            ]
-          },
-          "correlated_by_field_value" : {
-            "type" : "keyword",
-            "copy_to" : [
-              "all_field_values"
-            ]
-          },
-          "field_name" : {
-            "type" : "keyword"
-          },
-          "function" : {
-            "type" : "keyword"
-          },
-          "function_description" : {
-            "type" : "keyword"
-          },
-          "geo_results" : {
-            "properties" : {
-              "actual_point" : {
-                "type" : "geo_point"
-              },
-              "typical_point" : {
-                "type" : "geo_point"
-              }
+      }
+    },
+    "bucket_span" : {
+      "type" : "long"
+    },
+    "by_field_name" : {
+      "type" : "keyword"
+    },
+    "by_field_value" : {
+      "type" : "keyword",
+      "copy_to" : [
+        "all_field_values"
+      ]
+    },
+    "category_id" : {
+      "type" : "long"
+    },
+    "causes" : {
+      "type" : "nested",
+      "properties" : {
+        "actual" : {
+          "type" : "double"
+        },
+        "by_field_name" : {
+          "type" : "keyword"
+        },
+        "by_field_value" : {
+          "type" : "keyword",
+          "copy_to" : [
+            "all_field_values"
+          ]
+        },
+        "correlated_by_field_value" : {
+          "type" : "keyword",
+          "copy_to" : [
+            "all_field_values"
+          ]
+        },
+        "field_name" : {
+          "type" : "keyword"
+        },
+        "function" : {
+          "type" : "keyword"
+        },
+        "function_description" : {
+          "type" : "keyword"
+        },
+        "geo_results" : {
+          "properties" : {
+            "actual_point" : {
+              "type" : "geo_point"
+            },
+            "typical_point" : {
+              "type" : "geo_point"
             }
-          },
-          "over_field_name" : {
-            "type" : "keyword"
-          },
-          "over_field_value" : {
-            "type" : "keyword",
-            "copy_to" : [
-              "all_field_values"
-            ]
-          },
-          "partition_field_name" : {
-            "type" : "keyword"
-          },
-          "partition_field_value" : {
-            "type" : "keyword",
-            "copy_to" : [
-              "all_field_values"
-            ]
-          },
-          "probability" : {
-            "type" : "double"
-          },
-          "typical" : {
-            "type" : "double"
           }
+        },
+        "over_field_name" : {
+          "type" : "keyword"
+        },
+        "over_field_value" : {
+          "type" : "keyword",
+          "copy_to" : [
+            "all_field_values"
+          ]
+        },
+        "partition_field_name" : {
+          "type" : "keyword"
+        },
+        "partition_field_value" : {
+          "type" : "keyword",
+          "copy_to" : [
+            "all_field_values"
+          ]
+        },
+        "probability" : {
+          "type" : "double"
+        },
+        "typical" : {
+          "type" : "double"
         }
-      },
-      "description" : {
-        "type" : "text"
-      },
-      "detector_index" : {
-        "type" : "integer"
-      },
-      "earliest_record_timestamp" : {
-        "type" : "date"
-      },
-      "empty_bucket_count" : {
-        "type" : "long"
-      },
-      "event_count" : {
-        "type" : "long"
-      },
-      "examples" : {
-        "type" : "text"
-      },
-      "exponential_average_bucket_processing_time_ms" : {
-        "type" : "double"
-      },
-      "exponential_average_calculation_context" : {
-        "properties" : {
-          "incremental_metric_value_ms" : {
-            "type" : "double"
-          },
-          "latest_timestamp" : {
-            "type" : "date"
-          },
-          "previous_exponential_average_ms" : {
-            "type" : "double"
-          }
+      }
+    },
+    "description" : {
+      "type" : "text"
+    },
+    "detector_index" : {
+      "type" : "integer"
+    },
+    "earliest_record_timestamp" : {
+      "type" : "date"
+    },
+    "empty_bucket_count" : {
+      "type" : "long"
+    },
+    "event_count" : {
+      "type" : "long"
+    },
+    "examples" : {
+      "type" : "text"
+    },
+    "exponential_average_bucket_processing_time_ms" : {
+      "type" : "double"
+    },
+    "exponential_average_calculation_context" : {
+      "properties" : {
+        "incremental_metric_value_ms" : {
+          "type" : "double"
+        },
+        "latest_timestamp" : {
+          "type" : "date"
+        },
+        "previous_exponential_average_ms" : {
+          "type" : "double"
         }
-      },
-      "field_name" : {
-        "type" : "keyword"
-      },
-      "forecast_create_timestamp" : {
-        "type" : "date"
-      },
-      "forecast_end_timestamp" : {
-        "type" : "date"
-      },
-      "forecast_expiry_timestamp" : {
-        "type" : "date"
-      },
-      "forecast_id" : {
-        "type" : "keyword"
-      },
-      "forecast_lower" : {
-        "type" : "double"
-      },
-      "forecast_memory_bytes" : {
-        "type" : "long"
-      },
-      "forecast_messages" : {
-        "type" : "keyword"
-      },
-      "forecast_prediction" : {
-        "type" : "double"
-      },
-      "forecast_progress" : {
-        "type" : "double"
-      },
-      "forecast_start_timestamp" : {
-        "type" : "date"
-      },
-      "forecast_status" : {
-        "type" : "keyword"
-      },
-      "forecast_upper" : {
-        "type" : "double"
-      },
-      "function" : {
-        "type" : "keyword"
-      },
-      "function_description" : {
-        "type" : "keyword"
-      },
-      "geo_results" : {
-        "properties" : {
-          "actual_point" : {
-            "type" : "geo_point"
-          },
-          "typical_point" : {
-            "type" : "geo_point"
-          }
+      }
+    },
+    "field_name" : {
+      "type" : "keyword"
+    },
+    "forecast_create_timestamp" : {
+      "type" : "date"
+    },
+    "forecast_end_timestamp" : {
+      "type" : "date"
+    },
+    "forecast_expiry_timestamp" : {
+      "type" : "date"
+    },
+    "forecast_id" : {
+      "type" : "keyword"
+    },
+    "forecast_lower" : {
+      "type" : "double"
+    },
+    "forecast_memory_bytes" : {
+      "type" : "long"
+    },
+    "forecast_messages" : {
+      "type" : "keyword"
+    },
+    "forecast_prediction" : {
+      "type" : "double"
+    },
+    "forecast_progress" : {
+      "type" : "double"
+    },
+    "forecast_start_timestamp" : {
+      "type" : "date"
+    },
+    "forecast_status" : {
+      "type" : "keyword"
+    },
+    "forecast_upper" : {
+      "type" : "double"
+    },
+    "function" : {
+      "type" : "keyword"
+    },
+    "function_description" : {
+      "type" : "keyword"
+    },
+    "geo_results" : {
+      "properties" : {
+        "actual_point" : {
+          "type" : "geo_point"
+        },
+        "typical_point" : {
+          "type" : "geo_point"
         }
-      },
-      "influencer_field_name" : {
-        "type" : "keyword"
-      },
-      "influencer_field_value" : {
-        "type" : "keyword",
-        "copy_to" : [
-          "all_field_values"
-        ]
-      },
-      "influencer_score" : {
-        "type" : "double"
-      },
-      "influencers" : {
-        "type" : "nested",
-        "properties" : {
-          "influencer_field_name" : {
-            "type" : "keyword"
-          },
-          "influencer_field_values" : {
-            "type" : "keyword",
-            "copy_to" : [
-              "all_field_values"
-            ]
-          }
+      }
+    },
+    "influencer_field_name" : {
+      "type" : "keyword"
+    },
+    "influencer_field_value" : {
+      "type" : "keyword",
+      "copy_to" : [
+        "all_field_values"
+      ]
+    },
+    "influencer_score" : {
+      "type" : "double"
+    },
+    "influencers" : {
+      "type" : "nested",
+      "properties" : {
+        "influencer_field_name" : {
+          "type" : "keyword"
+        },
+        "influencer_field_values" : {
+          "type" : "keyword",
+          "copy_to" : [
+            "all_field_values"
+          ]
         }
-      },
-      "initial_anomaly_score" : {
-        "type" : "double"
-      },
-      "initial_influencer_score" : {
-        "type" : "double"
-      },
-      "initial_record_score" : {
-        "type" : "double"
-      },
-      "input_bytes" : {
-        "type" : "long"
-      },
-      "input_field_count" : {
-        "type" : "long"
-      },
-      "input_record_count" : {
-        "type" : "long"
-      },
-      "invalid_date_count" : {
-        "type" : "long"
-      },
-      "is_interim" : {
-        "type" : "boolean"
-      },
-      "job_id" : {
-        "type" : "keyword",
-        "copy_to" : [
-          "all_field_values"
-        ]
-      },
-      "last_data_time" : {
-        "type" : "date"
-      },
-      "latest_empty_bucket_timestamp" : {
-        "type" : "date"
-      },
-      "latest_record_time_stamp" : {
-        "type" : "date"
-      },
-      "latest_record_timestamp" : {
-        "type" : "date"
-      },
-      "latest_result_time_stamp" : {
-        "type" : "date"
-      },
-      "latest_sparse_bucket_timestamp" : {
-        "type" : "date"
-      },
-      "log_time" : {
-        "type" : "date"
-      },
-      "max_matching_length" : {
-        "type" : "long"
-      },
-      "maximum_bucket_processing_time_ms" : {
-        "type" : "double"
-      },
-      "memory_status" : {
-        "type" : "keyword"
-      },
-      "min_version" : {
-        "type" : "keyword"
-      },
-      "minimum_bucket_processing_time_ms" : {
-        "type" : "double"
-      },
-      "missing_field_count" : {
-        "type" : "long"
-      },
-      "mlcategory": {
-        "type": "keyword"
-      },
-      "model_bytes" : {
-        "type" : "long"
-      },
-      "model_feature" : {
-        "type" : "keyword"
-      },
-      "model_lower" : {
-        "type" : "double"
-      },
-      "model_median" : {
-        "type" : "double"
-      },
-      "model_size_stats" : {
-        "properties" : {
-          "assignment_memory_basis" : {
-            "type" : "keyword"
-          },
-          "bucket_allocation_failures_count" : {
-            "type" : "long"
-          },
-          "job_id" : {
-            "type" : "keyword"
-          },
-          "log_time" : {
-            "type" : "date"
-          },
-          "memory_status" : {
-            "type" : "keyword"
-          },
-          "model_bytes" : {
-            "type" : "long"
-          },
-          "peak_model_bytes" : {
-            "type" : "long"
-          },
-          "result_type" : {
-            "type" : "keyword"
-          },
-          "timestamp" : {
-            "type" : "date"
-          },
-          "total_by_field_count" : {
-            "type" : "long"
-          },
-          "total_over_field_count" : {
-            "type" : "long"
-          },
-          "total_partition_field_count" : {
-            "type" : "long"
-          }
+      }
+    },
+    "initial_anomaly_score" : {
+      "type" : "double"
+    },
+    "initial_influencer_score" : {
+      "type" : "double"
+    },
+    "initial_record_score" : {
+      "type" : "double"
+    },
+    "input_bytes" : {
+      "type" : "long"
+    },
+    "input_field_count" : {
+      "type" : "long"
+    },
+    "input_record_count" : {
+      "type" : "long"
+    },
+    "invalid_date_count" : {
+      "type" : "long"
+    },
+    "is_interim" : {
+      "type" : "boolean"
+    },
+    "job_id" : {
+      "type" : "keyword",
+      "copy_to" : [
+        "all_field_values"
+      ]
+    },
+    "last_data_time" : {
+      "type" : "date"
+    },
+    "latest_empty_bucket_timestamp" : {
+      "type" : "date"
+    },
+    "latest_record_time_stamp" : {
+      "type" : "date"
+    },
+    "latest_record_timestamp" : {
+      "type" : "date"
+    },
+    "latest_result_time_stamp" : {
+      "type" : "date"
+    },
+    "latest_sparse_bucket_timestamp" : {
+      "type" : "date"
+    },
+    "log_time" : {
+      "type" : "date"
+    },
+    "max_matching_length" : {
+      "type" : "long"
+    },
+    "maximum_bucket_processing_time_ms" : {
+      "type" : "double"
+    },
+    "memory_status" : {
+      "type" : "keyword"
+    },
+    "min_version" : {
+      "type" : "keyword"
+    },
+    "minimum_bucket_processing_time_ms" : {
+      "type" : "double"
+    },
+    "missing_field_count" : {
+      "type" : "long"
+    },
+    "mlcategory": {
+      "type": "keyword"
+    },
+    "model_bytes" : {
+      "type" : "long"
+    },
+    "model_feature" : {
+      "type" : "keyword"
+    },
+    "model_lower" : {
+      "type" : "double"
+    },
+    "model_median" : {
+      "type" : "double"
+    },
+    "model_size_stats" : {
+      "properties" : {
+        "assignment_memory_basis" : {
+          "type" : "keyword"
+        },
+        "bucket_allocation_failures_count" : {
+          "type" : "long"
+        },
+        "job_id" : {
+          "type" : "keyword"
+        },
+        "log_time" : {
+          "type" : "date"
+        },
+        "memory_status" : {
+          "type" : "keyword"
+        },
+        "model_bytes" : {
+          "type" : "long"
+        },
+        "peak_model_bytes" : {
+          "type" : "long"
+        },
+        "result_type" : {
+          "type" : "keyword"
+        },
+        "timestamp" : {
+          "type" : "date"
+        },
+        "total_by_field_count" : {
+          "type" : "long"
+        },
+        "total_over_field_count" : {
+          "type" : "long"
+        },
+        "total_partition_field_count" : {
+          "type" : "long"
         }
-      },
-      "model_upper" : {
-        "type" : "double"
-      },
-      "multi_bucket_impact" : {
-        "type" : "double"
-      },
-      "num_matches": {
-        "type" : "long"
-      },
-      "out_of_order_timestamp_count" : {
-        "type" : "long"
-      },
-      "over_field_name" : {
-        "type" : "keyword"
-      },
-      "over_field_value" : {
-        "type" : "keyword",
-        "copy_to" : [
-          "all_field_values"
-        ]
-      },
-      "partition_field_name" : {
-        "type" : "keyword"
-      },
-      "partition_field_value" : {
-        "type" : "keyword",
-        "copy_to" : [
-          "all_field_values"
-        ]
-      },
-      "preferred_to_categories": {
-        "type": "long"
-      },
-      "probability" : {
-        "type" : "double"
-      },
-      "processed_field_count" : {
-        "type" : "long"
-      },
-      "processed_record_count" : {
-        "type" : "long"
-      },
-      "processing_time_ms" : {
-        "type" : "long"
-      },
-      "quantiles" : {
-        "type" : "object",
-        "enabled" : false
-      },
-      "raw_anomaly_score" : {
-        "type" : "double"
-      },
-      "record_score" : {
-        "type" : "double"
-      },
-      "regex" : {
-        "type" : "keyword"
-      },
-      "result_type" : {
-        "type" : "keyword"
-      },
-      "retain" : {
-        "type" : "boolean"
-      },
-      "scheduled_events" : {
-        "type" : "keyword"
-      },
-      "search_count" : {
-        "type" : "long"
-      },
-      "snapshot_doc_count" : {
-        "type" : "integer"
-      },
-      "snapshot_id" : {
-        "type" : "keyword"
-      },
-      "sparse_bucket_count" : {
-        "type" : "long"
-      },
-      "terms" : {
-        "type" : "text"
-      },
-      "timestamp" : {
-        "type" : "date"
-      },
-      "total_by_field_count" : {
-        "type" : "long"
-      },
-      "total_over_field_count" : {
-        "type" : "long"
-      },
-      "total_partition_field_count" : {
-        "type" : "long"
-      },
-      "total_search_time_ms" : {
-        "type" : "double"
-      },
-      "typical" : {
-        "type" : "double"
       }
+    },
+    "model_upper" : {
+      "type" : "double"
+    },
+    "multi_bucket_impact" : {
+      "type" : "double"
+    },
+    "num_matches": {
+      "type" : "long"
+    },
+    "out_of_order_timestamp_count" : {
+      "type" : "long"
+    },
+    "over_field_name" : {
+      "type" : "keyword"
+    },
+    "over_field_value" : {
+      "type" : "keyword",
+      "copy_to" : [
+        "all_field_values"
+      ]
+    },
+    "partition_field_name" : {
+      "type" : "keyword"
+    },
+    "partition_field_value" : {
+      "type" : "keyword",
+      "copy_to" : [
+        "all_field_values"
+      ]
+    },
+    "preferred_to_categories": {
+      "type": "long"
+    },
+    "probability" : {
+      "type" : "double"
+    },
+    "processed_field_count" : {
+      "type" : "long"
+    },
+    "processed_record_count" : {
+      "type" : "long"
+    },
+    "processing_time_ms" : {
+      "type" : "long"
+    },
+    "quantiles" : {
+      "type" : "object",
+      "enabled" : false
+    },
+    "raw_anomaly_score" : {
+      "type" : "double"
+    },
+    "record_score" : {
+      "type" : "double"
+    },
+    "regex" : {
+      "type" : "keyword"
+    },
+    "result_type" : {
+      "type" : "keyword"
+    },
+    "retain" : {
+      "type" : "boolean"
+    },
+    "scheduled_events" : {
+      "type" : "keyword"
+    },
+    "search_count" : {
+      "type" : "long"
+    },
+    "snapshot_doc_count" : {
+      "type" : "integer"
+    },
+    "snapshot_id" : {
+      "type" : "keyword"
+    },
+    "sparse_bucket_count" : {
+      "type" : "long"
+    },
+    "terms" : {
+      "type" : "text"
+    },
+    "timestamp" : {
+      "type" : "date"
+    },
+    "total_by_field_count" : {
+      "type" : "long"
+    },
+    "total_over_field_count" : {
+      "type" : "long"
+    },
+    "total_partition_field_count" : {
+      "type" : "long"
+    },
+    "total_search_time_ms" : {
+      "type" : "double"
+    },
+    "typical" : {
+      "type" : "double"
     }
   }
 }

+ 19 - 13
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/results_index_template.json

@@ -1,20 +1,26 @@
 {
-  "order" : 0,
+  "priority": 2147483647,
   "version" : ${xpack.ml.version.id},
   "index_patterns" : [
     ".ml-anomalies-*"
   ],
-  "settings" : {
-    "index" : {
-      "translog" : {
-        "durability" : "async"
-      },
-      "auto_expand_replicas" : "0-1",
-      "query" : {
-        "default_field" : "all_field_values"
-      },
-      "hidden": true
-    }
+  "template" : {
+    "settings" : {
+      "index" : {
+        "translog" : {
+          "durability" : "async"
+        },
+        "auto_expand_replicas" : "0-1",
+        "query" : {
+          "default_field" : "all_field_values"
+        },
+        "hidden": true
+      }
+    },
+    "mappings": ${xpack.ml.anomalydetection.results.mappings}
   },
-  "mappings": ${xpack.ml.anomalydetection.results.mappings}
+  "_meta" : {
+    "description": "index template for ML anomaly detection results indices",
+    "managed": true
+  }
 }

+ 16 - 12
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/state_index_template.json

@@ -1,24 +1,28 @@
 {
-  "order" : 0,
+  "priority": 2147483647,
   "version" : ${xpack.ml.version.id},
   "index_patterns" : [
     ".ml-state*"
   ],
-  "settings" : {
-    "index" : {
-      "auto_expand_replicas" : "0-1",
-      "hidden": true
+  "template" : {
+    "settings" : {
+      "index" : {
+        "auto_expand_replicas" : "0-1",
+        "hidden": true
+      },
+      "index.lifecycle.name": "${xpack.ml.index.lifecycle.name}",
+      "index.lifecycle.rollover_alias": "${xpack.ml.index.lifecycle.rollover_alias}"
     },
-    "index.lifecycle.name": "${xpack.ml.index.lifecycle.name}",
-    "index.lifecycle.rollover_alias": "${xpack.ml.index.lifecycle.rollover_alias}"
-  },
-  "mappings" : {
-    "_doc": {
+    "mappings" : {
       "_meta": {
         "version": "${xpack.ml.version}"
       },
       "enabled": false
-    }
+    },
+    "aliases" : {}
   },
-  "aliases" : {}
+  "_meta" : {
+    "description": "index template for ML state indices",
+    "managed": true
+  }
 }

+ 17 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/notifications_index_legacy_template.json

@@ -0,0 +1,17 @@
+{
+  "order" : 0,
+  "version" : ${xpack.ml.version.id},
+  "index_patterns" : [
+    ".ml-notifications-000001"
+  ],
+  "settings" : {
+    "index" : {
+      "number_of_shards" : "1",
+      "auto_expand_replicas" : "0-1",
+      "hidden": true
+    }
+  },
+  "mappings" : {
+    "_doc": ${xpack.ml.notifications.mappings}
+  }
+}

+ 32 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/notifications_index_mappings.json

@@ -0,0 +1,32 @@
+{
+  "_meta" : {
+    "version" : "${xpack.ml.version}"
+  },
+  "dynamic" : "false",
+  "properties" : {
+    "job_id": {
+      "type": "keyword"
+    },
+    "level": {
+      "type": "keyword"
+    },
+    "message": {
+      "type": "text",
+      "fields": {
+        "raw": {
+          "type": "keyword",
+          "ignore_above": 1024
+        }
+      }
+    },
+    "timestamp": {
+      "type": "date"
+    },
+    "node_name": {
+      "type": "keyword"
+    },
+    "job_type": {
+      "type": "keyword"
+    }
+  }
+}

+ 13 - 40
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/notifications_index_template.json

@@ -1,48 +1,21 @@
 {
-  "order" : 0,
+  "priority" : 2147483647,
   "version" : ${xpack.ml.version.id},
   "index_patterns" : [
     ".ml-notifications-000001"
   ],
-  "settings" : {
-    "index" : {
-      "number_of_shards" : "1",
-      "auto_expand_replicas" : "0-1",
-      "hidden": true
-    }
-  },
-  "mappings" : {
-    "_doc": {
-      "_meta" : {
-        "version" : "${xpack.ml.version}"
-      },
-      "dynamic" : "false",
-      "properties" : {
-        "job_id": {
-          "type": "keyword"
-        },
-        "level": {
-          "type": "keyword"
-        },
-        "message": {
-          "type": "text",
-          "fields": {
-            "raw": {
-              "type": "keyword",
-              "ignore_above": 1024
-            }
-          }
-        },
-        "timestamp": {
-          "type": "date"
-        },
-        "node_name": {
-          "type": "keyword"
-        },
-        "job_type": {
-          "type": "keyword"
-        }
+  "template" : {
+    "settings" : {
+      "index" : {
+        "number_of_shards" : "1",
+        "auto_expand_replicas" : "0-1",
+        "hidden": true
       }
-    }
+    },
+    "mappings" : ${xpack.ml.notifications.mappings}
+  },
+  "_meta" : {
+    "description": "index template for ML notifications indices",
+    "managed": true
   }
 }

+ 137 - 139
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/stats_index_mappings.json

@@ -1,148 +1,146 @@
 {
-  "_doc": {
-    "_meta": {
-      "version" : "${xpack.ml.version}"
-    },
-    "dynamic": false,
-    "properties" : {
-      "iteration": {
-        "type": "integer"
-      },
-      "hyperparameters": {
-        "properties": {
-          "alpha": {
-            "type": "double"
-          },
-          "class_assignment_objective": {
-            "type": "keyword"
-          },
-          "downsample_factor": {
-            "type": "double"
-          },
-          "eta": {
-            "type": "double"
-          },
-          "eta_growth_rate_per_tree": {
-            "type": "double"
-          },
-          "feature_bag_fraction": {
-            "type": "double"
-          },
-          "gamma": {
-            "type": "double"
-          },
-          "lambda": {
-            "type": "double"
-          },
-          "max_attempts_to_add_tree": {
-            "type": "integer"
-          },
-          "max_optimization_rounds_per_hyperparameter": {
-            "type": "integer"
-          },
-          "max_trees": {
-            "type": "integer"
-          },
-          "num_folds": {
-            "type": "integer"
-          },
-          "num_splits_per_feature": {
-            "type": "integer"
-          },
-          "soft_tree_depth_limit": {
-            "type": "double"
-          },
-          "soft_tree_depth_tolerance": {
-            "type": "double"
-          }
+  "_meta": {
+    "version" : "${xpack.ml.version}"
+  },
+  "dynamic": false,
+  "properties" : {
+    "iteration": {
+      "type": "integer"
+    },
+    "hyperparameters": {
+      "properties": {
+        "alpha": {
+          "type": "double"
+        },
+        "class_assignment_objective": {
+          "type": "keyword"
+        },
+        "downsample_factor": {
+          "type": "double"
+        },
+        "eta": {
+          "type": "double"
+        },
+        "eta_growth_rate_per_tree": {
+          "type": "double"
+        },
+        "feature_bag_fraction": {
+          "type": "double"
+        },
+        "gamma": {
+          "type": "double"
+        },
+        "lambda": {
+          "type": "double"
+        },
+        "max_attempts_to_add_tree": {
+          "type": "integer"
+        },
+        "max_optimization_rounds_per_hyperparameter": {
+          "type": "integer"
+        },
+        "max_trees": {
+          "type": "integer"
+        },
+        "num_folds": {
+          "type": "integer"
+        },
+        "num_splits_per_feature": {
+          "type": "integer"
+        },
+        "soft_tree_depth_limit": {
+          "type": "double"
+        },
+        "soft_tree_depth_tolerance": {
+          "type": "double"
         }
-      },
-      "job_id" : {
-        "type" : "keyword"
-      },
-      "parameters": {
-        "properties": {
-          "compute_feature_influence": {
-            "type": "boolean"
-          },
-          "feature_influence_threshold": {
-            "type": "double"
-          },
-          "method": {
-            "type": "keyword"
-          },
-          "n_neighbors": {
-            "type": "integer"
-          },
-          "outlier_fraction": {
-            "type": "double"
-          },
-          "standardization_enabled": {
-            "type": "boolean"
-          }
+      }
+    },
+    "job_id" : {
+      "type" : "keyword"
+    },
+    "parameters": {
+      "properties": {
+        "compute_feature_influence": {
+          "type": "boolean"
+        },
+        "feature_influence_threshold": {
+          "type": "double"
+        },
+        "method": {
+          "type": "keyword"
+        },
+        "n_neighbors": {
+          "type": "integer"
+        },
+        "outlier_fraction": {
+          "type": "double"
+        },
+        "standardization_enabled": {
+          "type": "boolean"
         }
-      },
-      "peak_usage_bytes" : {
-        "type" : "long"
-      },
-      "model_id": {
-        "type": "keyword"
-      },
-      "node_id": {
-        "type": "keyword"
-      },
-      "inference_count": {
-        "type": "long"
-      },
-      "failure_count": {
-        "type": "long"
-      },
-      "cache_miss_count": {
-        "type": "long"
-      },
-      "missing_all_fields_count": {
-        "type": "long"
-      },
-      "skipped_docs_count": {
-        "type": "long"
-      },
-      "timestamp" : {
-        "type" : "date"
-      },
-      "timing_stats": {
-        "properties": {
-          "elapsed_time": {
-            "type": "long"
-          },
-          "iteration_time": {
-            "type": "long"
-          }
+      }
+    },
+    "peak_usage_bytes" : {
+      "type" : "long"
+    },
+    "model_id": {
+      "type": "keyword"
+    },
+    "node_id": {
+      "type": "keyword"
+    },
+    "inference_count": {
+      "type": "long"
+    },
+    "failure_count": {
+      "type": "long"
+    },
+    "cache_miss_count": {
+      "type": "long"
+    },
+    "missing_all_fields_count": {
+      "type": "long"
+    },
+    "skipped_docs_count": {
+      "type": "long"
+    },
+    "timestamp" : {
+      "type" : "date"
+    },
+    "timing_stats": {
+      "properties": {
+        "elapsed_time": {
+          "type": "long"
+        },
+        "iteration_time": {
+          "type": "long"
         }
-      },
-      "test_docs_count": {
-        "type": "long"
-      },
-      "training_docs_count": {
-        "type": "long"
-      },
-      "type" : {
-        "type" : "keyword"
-      },
-      "validation_loss": {
-        "properties": {
-          "fold_values": {
-            "properties": {
-              "fold": {
-                "type": "integer"
-              },
-              "values": {
-                "type": "double"
-              }
+      }
+    },
+    "test_docs_count": {
+      "type": "long"
+    },
+    "training_docs_count": {
+      "type": "long"
+    },
+    "type" : {
+      "type" : "keyword"
+    },
+    "validation_loss": {
+      "properties": {
+        "fold_values": {
+          "properties": {
+            "fold": {
+              "type": "integer"
+            },
+            "values": {
+              "type": "double"
             }
-          },
-          "loss_type": {
-            "type": "keyword"
           }
+        },
+        "loss_type": {
+          "type": "keyword"
         }
       }
     }

+ 16 - 10
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/stats_index_template.json

@@ -1,18 +1,24 @@
 {
-  "order" : 0,
+  "priority" : 2147483647,
   "version" : ${xpack.ml.version.id},
   "index_patterns" : [
     ".ml-stats-*"
   ],
-  "settings": {
-    "index" : {
-      "number_of_shards" : "1",
-      "auto_expand_replicas" : "0-1",
-      "hidden": true
+  "template": {
+    "settings": {
+      "index" : {
+        "number_of_shards" : "1",
+        "auto_expand_replicas" : "0-1",
+        "hidden": true
+      },
+      "index.lifecycle.name": "${xpack.ml.index.lifecycle.name}",
+      "index.lifecycle.rollover_alias": "${xpack.ml.index.lifecycle.rollover_alias}"
     },
-    "index.lifecycle.name": "${xpack.ml.index.lifecycle.name}",
-    "index.lifecycle.rollover_alias": "${xpack.ml.index.lifecycle.rollover_alias}"
+    "mappings" : ${xpack.ml.stats.mappings},
+    "aliases" : {}
   },
-  "mappings" : ${xpack.ml.stats.mappings},
-  "aliases" : {}
+  "_meta" : {
+    "description": "index template for ML stats indices",
+    "managed": true
+  }
 }

+ 38 - 10
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditorTests.java

@@ -8,6 +8,8 @@ package org.elasticsearch.xpack.core.common.notifications;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction;
 import org.elasticsearch.action.bulk.BulkAction;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -19,8 +21,10 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -35,6 +39,7 @@ import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
 import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
 import org.junit.After;
 import org.junit.Before;
@@ -43,6 +48,7 @@ import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 
@@ -92,7 +98,8 @@ public class AbstractAuditorTests extends ESTestCase {
     }
 
     public void testInfo() throws IOException {
-        AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithTemplateInstalled(client);
+        AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor =
+            createTestAuditorWithTemplateInstalled(client, Version.CURRENT);
         auditor.info("foo", "Here is my info");
 
         verify(client).execute(eq(IndexAction.INSTANCE), indexRequestCaptor.capture(), any());
@@ -109,7 +116,8 @@ public class AbstractAuditorTests extends ESTestCase {
     }
 
     public void testWarning() throws IOException {
-        AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithTemplateInstalled(client);
+        AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor =
+            createTestAuditorWithTemplateInstalled(client, Version.CURRENT);
         auditor.warning("bar", "Here is my warning");
 
         verify(client).execute(eq(IndexAction.INSTANCE), indexRequestCaptor.capture(), any());
@@ -126,7 +134,8 @@ public class AbstractAuditorTests extends ESTestCase {
     }
 
     public void testError() throws IOException {
-        AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithTemplateInstalled(client);
+        AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor =
+            createTestAuditorWithTemplateInstalled(client, Version.CURRENT);
         auditor.error("foobar", "Here is my error");
 
         verify(client).execute(eq(IndexAction.INSTANCE), indexRequestCaptor.capture(), any());
@@ -145,7 +154,8 @@ public class AbstractAuditorTests extends ESTestCase {
     public void testAuditingBeforeTemplateInstalled() throws Exception {
         CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1);
         AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor =
-            createTestAuditorWithoutTemplate(client, writeSomeDocsBeforeTemplateLatch);
+            // TODO: Both this call and the called method can be simplified in versions that will never have to talk to 7.13
+            createTestAuditorWithoutTemplate(client, randomFrom(Version.CURRENT, Version.V_7_13_0), writeSomeDocsBeforeTemplateLatch);
 
         auditor.error("foobar", "Here is my error to queue");
         auditor.warning("foobar", "Here is my warning to queue");
@@ -171,7 +181,7 @@ public class AbstractAuditorTests extends ESTestCase {
     public void testMaxBufferSize() throws Exception {
         CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1);
         AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor =
-            createTestAuditorWithoutTemplate(client, writeSomeDocsBeforeTemplateLatch);
+            createTestAuditorWithoutTemplate(client, Version.CURRENT, writeSomeDocsBeforeTemplateLatch);
 
         int numThreads = 2;
         int numMessagesToWrite = (AbstractAuditor.MAX_BUFFER_SIZE / numThreads) + 10;
@@ -195,13 +205,18 @@ public class AbstractAuditorTests extends ESTestCase {
         return AbstractAuditMessageTests.TestAuditMessage.PARSER.apply(parser, null);
     }
 
-    private TestAuditor createTestAuditorWithTemplateInstalled(Client client) {
+    private TestAuditor createTestAuditorWithTemplateInstalled(Client client, Version minNodeVersion) {
         ImmutableOpenMap.Builder<String, IndexTemplateMetadata> templates = ImmutableOpenMap.builder(1);
         templates.put(TEST_INDEX, mock(IndexTemplateMetadata.class));
+        Map<String, ComposableIndexTemplate> templatesV2 = Collections.singletonMap(TEST_INDEX, mock(ComposableIndexTemplate.class));
         Metadata metadata = mock(Metadata.class);
         when(metadata.getTemplates()).thenReturn(templates.build());
+        when(metadata.templatesV2()).thenReturn(templatesV2);
+        DiscoveryNodes nodes = mock(DiscoveryNodes.class);
+        when(nodes.getMinNodeVersion()).thenReturn(minNodeVersion);
         ClusterState state = mock(ClusterState.class);
         when(state.getMetadata()).thenReturn(metadata);
+        when(state.nodes()).thenReturn(nodes);
         ClusterService clusterService = mock(ClusterService.class);
         when(clusterService.state()).thenReturn(state);
 
@@ -209,11 +224,16 @@ public class AbstractAuditorTests extends ESTestCase {
     }
 
     @SuppressWarnings("unchecked")
-    private TestAuditor createTestAuditorWithoutTemplate(Client client, CountDownLatch latch) {
+    private TestAuditor createTestAuditorWithoutTemplate(Client client, Version minNodeVersion, CountDownLatch latch) {
         if (Mockito.mockingDetails(client).isMock() == false) {
             throw new AssertionError("client should be a mock");
         }
 
+        ActionType<AcknowledgedResponse> expectedTemplateAction =
+            minNodeVersion.before(Version.CURRENT)
+                ? PutIndexTemplateAction.INSTANCE
+                : PutComposableIndexTemplateAction.INSTANCE;
+
         doAnswer(invocationOnMock -> {
             ActionListener<AcknowledgedResponse> listener =
                 (ActionListener<AcknowledgedResponse>)invocationOnMock.getArguments()[2];
@@ -232,7 +252,7 @@ public class AbstractAuditorTests extends ESTestCase {
             threadPool.generic().submit(onPutTemplate);
 
             return null;
-        }).when(client).execute(eq(PutIndexTemplateAction.INSTANCE), any(), any());
+        }).when(client).execute(eq(expectedTemplateAction), any(), any());
 
         IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
         AdminClient adminClient = mock(AdminClient.class);
@@ -242,8 +262,11 @@ public class AbstractAuditorTests extends ESTestCase {
         ImmutableOpenMap.Builder<String, IndexTemplateMetadata> templates = ImmutableOpenMap.builder(0);
         Metadata metadata = mock(Metadata.class);
         when(metadata.getTemplates()).thenReturn(templates.build());
+        DiscoveryNodes nodes = mock(DiscoveryNodes.class);
+        when(nodes.getMinNodeVersion()).thenReturn(minNodeVersion);
         ClusterState state = mock(ClusterState.class);
         when(state.getMetadata()).thenReturn(metadata);
+        when(state.nodes()).thenReturn(nodes);
         ClusterService clusterService = mock(ClusterService.class);
         when(clusterService.state()).thenReturn(state);
 
@@ -253,10 +276,15 @@ public class AbstractAuditorTests extends ESTestCase {
     public static class TestAuditor extends AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> {
 
         TestAuditor(Client client, String nodeName, ClusterService clusterService) {
-            super(new OriginSettingClient(client, TEST_ORIGIN), TEST_INDEX,
+            super(new OriginSettingClient(client, TEST_ORIGIN), TEST_INDEX, Version.CURRENT,
+                new IndexTemplateConfig(TEST_INDEX,
+                    "/org/elasticsearch/xpack/core/ml/notifications_index_legacy_template.json", Version.CURRENT.id, "xpack.ml.version",
+                    Map.of("xpack.ml.version.id", String.valueOf(Version.CURRENT.id),
+                        "xpack.ml.notifications.mappings", NotificationsIndex.mapping())),
                 new IndexTemplateConfig(TEST_INDEX,
                     "/org/elasticsearch/xpack/core/ml/notifications_index_template.json", Version.CURRENT.id, "xpack.ml.version",
-                    Collections.singletonMap("xpack.ml.version.id", String.valueOf(Version.CURRENT.id))),
+                    Map.of("xpack.ml.version.id", String.valueOf(Version.CURRENT.id),
+                        "xpack.ml.notifications.mappings", NotificationsIndex.mapping())),
                 nodeName, AbstractAuditMessageTests.TestAuditMessage::new, clusterService);
         }
     }

+ 124 - 32
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java

@@ -19,6 +19,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.client.AdminClient;
@@ -28,16 +29,21 @@ import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.indices.TestIndexNameExpressionResolver;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants;
+import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
 import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
 import org.junit.After;
 import org.junit.Before;
@@ -45,6 +51,8 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 import org.mockito.stubbing.Answer;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -58,6 +66,7 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -110,6 +119,12 @@ public class MlIndexAndAliasTests extends ESTestCase {
         client = mock(Client.class);
         when(client.threadPool()).thenReturn(threadPool);
         when(client.admin()).thenReturn(adminClient);
+        doAnswer(invocationOnMock -> {
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[2];
+            listener.onResponse(AcknowledgedResponse.TRUE);
+            return null;
+        }).when(client).execute(any(PutComposableIndexTemplateAction.class), any(PutComposableIndexTemplateAction.Request.class),
+            any(ActionListener.class));
 
         listener = mock(ActionListener.class);
 
@@ -122,35 +137,94 @@ public class MlIndexAndAliasTests extends ESTestCase {
         verifyNoMoreInteractions(indicesAdminClient, listener);
     }
 
-    public void testInstallIndexTemplateIfRequired_GivenTemplateExists() {
-        ClusterState clusterState = createClusterState(Collections.emptyMap(),
-            Collections.singletonMap(InferenceIndexConstants.LATEST_INDEX_NAME,
-                createIndexTemplateMetaData(InferenceIndexConstants.LATEST_INDEX_NAME,
-                    Collections.singletonList(InferenceIndexConstants.LATEST_INDEX_NAME))));
+    public void testInstallIndexTemplateIfRequired_GivenTemplateLegacyTemplateExistsAndMixedCluster() throws UnknownHostException {
+        // TODO: this test can be removed from branches that will never need to talk to 7.13
+        ClusterState clusterState = createClusterState(Version.V_7_13_0, Collections.emptyMap(),
+            Collections.singletonMap(NotificationsIndex.NOTIFICATIONS_INDEX,
+                createLegacyIndexTemplateMetaData(NotificationsIndex.NOTIFICATIONS_INDEX,
+                    Collections.singletonList(NotificationsIndex.NOTIFICATIONS_INDEX))),
+            Collections.emptyMap());
+
+        IndexTemplateConfig legacyNotificationsTemplate = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
+            "/org/elasticsearch/xpack/core/ml/notifications_index_legacy_template.json", Version.CURRENT.id, "xpack.ml.version",
+            Map.of("xpack.ml.version.id", String.valueOf(Version.CURRENT.id),
+                "xpack.ml.notifications.mappings", NotificationsIndex.mapping()));
+        IndexTemplateConfig notificationsTemplate = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
+            "/org/elasticsearch/xpack/core/ml/notifications_index_template.json", Version.CURRENT.id, "xpack.ml.version",
+            Map.of("xpack.ml.version.id", String.valueOf(Version.CURRENT.id),
+                "xpack.ml.notifications.mappings", NotificationsIndex.mapping()));
 
-        IndexTemplateConfig inferenceTemplate = new IndexTemplateConfig(InferenceIndexConstants.LATEST_INDEX_NAME,
-            "not_a_real_file.json", Version.CURRENT.id, "xpack.ml.version",
-            Collections.singletonMap("xpack.ml.version.id", String.valueOf(Version.CURRENT.id)));
+        // ML didn't use composable templates in 7.13 and the legacy template exists, so nothing needs to be done
+        MlIndexAndAlias.installIndexTemplateIfRequired(clusterState, client, Version.CURRENT, legacyNotificationsTemplate,
+            notificationsTemplate, TimeValue.timeValueMinutes(1), listener);
+        verify(listener).onResponse(true);
+        verifyNoMoreInteractions(client);
+    }
+
+    public void testInstallIndexTemplateIfRequired_GivenLegacyTemplateExistsAndModernCluster() throws UnknownHostException {
+        ClusterState clusterState = createClusterState(Version.CURRENT, Collections.emptyMap(),
+            Collections.singletonMap(NotificationsIndex.NOTIFICATIONS_INDEX,
+                createLegacyIndexTemplateMetaData(NotificationsIndex.NOTIFICATIONS_INDEX,
+                    Collections.singletonList(NotificationsIndex.NOTIFICATIONS_INDEX))),
+            Collections.emptyMap());
+
+        IndexTemplateConfig legacyNotificationsTemplate = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
+            "/org/elasticsearch/xpack/core/ml/notifications_index_legacy_template.json", Version.CURRENT.id, "xpack.ml.version",
+            Map.of("xpack.ml.version.id", String.valueOf(Version.CURRENT.id),
+                "xpack.ml.notifications.mappings", NotificationsIndex.mapping()));
+        IndexTemplateConfig notificationsTemplate = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
+            "/org/elasticsearch/xpack/core/ml/notifications_index_template.json", Version.CURRENT.id, "xpack.ml.version",
+            Map.of("xpack.ml.version.id", String.valueOf(Version.CURRENT.id),
+                "xpack.ml.notifications.mappings", NotificationsIndex.mapping()));
 
-        MlIndexAndAlias.installIndexTemplateIfRequired(clusterState, client, inferenceTemplate, listener);
+        MlIndexAndAlias.installIndexTemplateIfRequired(clusterState, client, Version.CURRENT, legacyNotificationsTemplate,
+            notificationsTemplate, TimeValue.timeValueMinutes(1), listener);
+        InOrder inOrder = inOrder(client, listener);
+        inOrder.verify(client).execute(same(PutComposableIndexTemplateAction.INSTANCE), any(), any());
+        inOrder.verify(listener).onResponse(true);
+    }
+
+    public void testInstallIndexTemplateIfRequired_GivenComposableTemplateExists() throws UnknownHostException {
+        ClusterState clusterState = createClusterState(Version.CURRENT, Collections.emptyMap(), Collections.emptyMap(),
+            Collections.singletonMap(NotificationsIndex.NOTIFICATIONS_INDEX,
+                createComposableIndexTemplateMetaData(NotificationsIndex.NOTIFICATIONS_INDEX,
+                    Collections.singletonList(NotificationsIndex.NOTIFICATIONS_INDEX))));
+
+        IndexTemplateConfig legacyNotificationsTemplate = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
+            "/org/elasticsearch/xpack/core/ml/notifications_index_legacy_template.json", Version.CURRENT.id, "xpack.ml.version",
+            Map.of("xpack.ml.version.id", String.valueOf(Version.CURRENT.id),
+                "xpack.ml.notifications.mappings", NotificationsIndex.mapping()));
+        IndexTemplateConfig notificationsTemplate = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
+            "/org/elasticsearch/xpack/core/ml/notifications_index_template.json", Version.CURRENT.id, "xpack.ml.version",
+            Map.of("xpack.ml.version.id", String.valueOf(Version.CURRENT.id),
+                "xpack.ml.notifications.mappings", NotificationsIndex.mapping()));
+
+        MlIndexAndAlias.installIndexTemplateIfRequired(clusterState, client, Version.CURRENT, legacyNotificationsTemplate,
+            notificationsTemplate, TimeValue.timeValueMinutes(1), listener);
         verify(listener).onResponse(true);
         verifyNoMoreInteractions(client);
     }
 
-    public void testInstallIndexTemplateIfRequired() {
+    public void testInstallIndexTemplateIfRequired() throws UnknownHostException {
         ClusterState clusterState = createClusterState(Collections.emptyMap());
 
-        IndexTemplateConfig notificationsTemplate = new IndexTemplateConfig(InferenceIndexConstants.LATEST_INDEX_NAME,
+        IndexTemplateConfig legacyNotificationsTemplate = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
+            "/org/elasticsearch/xpack/core/ml/notifications_index_legacy_template.json", Version.CURRENT.id, "xpack.ml.version",
+            Map.of("xpack.ml.version.id", String.valueOf(Version.CURRENT.id),
+                "xpack.ml.notifications.mappings", NotificationsIndex.mapping()));
+        IndexTemplateConfig notificationsTemplate = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
             "/org/elasticsearch/xpack/core/ml/notifications_index_template.json", Version.CURRENT.id, "xpack.ml.version",
-            Collections.singletonMap("xpack.ml.version.id", String.valueOf(Version.CURRENT.id)));
+            Map.of("xpack.ml.version.id", String.valueOf(Version.CURRENT.id),
+                "xpack.ml.notifications.mappings", NotificationsIndex.mapping()));
 
-        MlIndexAndAlias.installIndexTemplateIfRequired(clusterState, client, notificationsTemplate, listener);
-        InOrder inOrder = inOrder(indicesAdminClient, listener);
-        inOrder.verify(indicesAdminClient).putTemplate(any(), any());
+        MlIndexAndAlias.installIndexTemplateIfRequired(clusterState, client, Version.CURRENT, legacyNotificationsTemplate,
+            notificationsTemplate, TimeValue.timeValueMinutes(1), listener);
+        InOrder inOrder = inOrder(client, listener);
+        inOrder.verify(client).execute(same(PutComposableIndexTemplateAction.INSTANCE), any(), any());
         inOrder.verify(listener).onResponse(true);
     }
 
-    public void testCreateStateIndexAndAliasIfNecessary_CleanState() {
+    public void testCreateStateIndexAndAliasIfNecessary_CleanState() throws UnknownHostException {
         ClusterState clusterState = createClusterState(Collections.emptyMap());
         createIndexAndAliasIfNecessary(clusterState);
 
@@ -164,26 +238,28 @@ public class MlIndexAndAliasTests extends ESTestCase {
         assertThat(createRequest.aliases(), equalTo(Collections.singleton(new Alias(TEST_INDEX_ALIAS).isHidden(true))));
     }
 
-    private void assertNoClientInteractionsWhenWriteAliasAlreadyExists(String indexName) {
+    private void assertNoClientInteractionsWhenWriteAliasAlreadyExists(String indexName) throws UnknownHostException {
         ClusterState clusterState = createClusterState(Collections.singletonMap(indexName, createIndexMetadataWithAlias(indexName)));
         createIndexAndAliasIfNecessary(clusterState);
 
         verify(listener).onResponse(false);
     }
 
-    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtInitialStateIndex() {
+    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtInitialStateIndex() throws UnknownHostException {
         assertNoClientInteractionsWhenWriteAliasAlreadyExists(FIRST_CONCRETE_INDEX);
     }
 
-    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtSubsequentStateIndex() {
+    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtSubsequentStateIndex()
+        throws UnknownHostException {
         assertNoClientInteractionsWhenWriteAliasAlreadyExists("test-000007");
     }
 
-    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtDummyIndex() {
+    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtDummyIndex() throws UnknownHostException {
         assertNoClientInteractionsWhenWriteAliasAlreadyExists("dummy-index");
     }
 
-    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtLegacyStateIndex() {
+    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtLegacyStateIndex()
+        throws UnknownHostException {
         ClusterState clusterState =
             createClusterState(
                 Collections.singletonMap(LEGACY_INDEX_WITHOUT_SUFFIX, createIndexMetadataWithAlias(LEGACY_INDEX_WITHOUT_SUFFIX)));
@@ -208,7 +284,8 @@ public class MlIndexAndAliasTests extends ESTestCase {
                 AliasActions.remove().alias(TEST_INDEX_ALIAS).index(LEGACY_INDEX_WITHOUT_SUFFIX)));
     }
 
-    private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List<String> existingIndexNames, String expectedWriteIndexName) {
+    private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List<String> existingIndexNames, String expectedWriteIndexName)
+        throws UnknownHostException {
         ClusterState clusterState =
             createClusterState(
                 existingIndexNames.stream().collect(toMap(Function.identity(), MlIndexAndAliasTests::createIndexMetadata)));
@@ -225,22 +302,23 @@ public class MlIndexAndAliasTests extends ESTestCase {
             contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName).isHidden(true)));
     }
 
-    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButInitialStateIndexExists() {
+    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButInitialStateIndexExists() throws UnknownHostException {
         assertMlStateWriteAliasAddedToMostRecentMlStateIndex(
             Arrays.asList(FIRST_CONCRETE_INDEX), FIRST_CONCRETE_INDEX);
     }
 
-    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButSubsequentStateIndicesExist() {
+    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButSubsequentStateIndicesExist() throws UnknownHostException {
         assertMlStateWriteAliasAddedToMostRecentMlStateIndex(
             Arrays.asList("test-000003", "test-000040", "test-000500"), "test-000500");
     }
 
-    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButBothLegacyAndNewIndicesExist() {
+    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButBothLegacyAndNewIndicesExist()
+        throws UnknownHostException {
         assertMlStateWriteAliasAddedToMostRecentMlStateIndex(
             Arrays.asList(LEGACY_INDEX_WITHOUT_SUFFIX, "test-000003", "test-000040", "test-000500"), "test-000500");
     }
 
-    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButLegacyStateIndexExists() {
+    public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButLegacyStateIndexExists() throws UnknownHostException {
         ClusterState clusterState =
             createClusterState(Collections.singletonMap(LEGACY_INDEX_WITHOUT_SUFFIX, createIndexMetadata(LEGACY_INDEX_WITHOUT_SUFFIX)));
         createIndexAndAliasIfNecessary(clusterState);
@@ -295,15 +373,25 @@ public class MlIndexAndAliasTests extends ESTestCase {
         };
     }
 
-    private static ClusterState createClusterState(Map<String, IndexMetadata> indices) {
-        return createClusterState(indices, Collections.emptyMap());
+    private static ClusterState createClusterState(Map<String, IndexMetadata> indices) throws UnknownHostException {
+        return createClusterState(Version.CURRENT, indices, Collections.emptyMap(), Collections.emptyMap());
     }
 
-    private static ClusterState createClusterState(Map<String, IndexMetadata> indices, Map<String, IndexTemplateMetadata> templates) {
+    private static ClusterState createClusterState(Version minNodeVersion,
+                                                   Map<String, IndexMetadata> indices,
+                                                   Map<String, IndexTemplateMetadata> legacyTemplates,
+                                                   Map<String, ComposableIndexTemplate> composableTemplates) throws UnknownHostException {
+        InetAddress inetAddress1 = InetAddress.getByAddress(new byte[]{(byte) 192, (byte) 168, (byte) 0, (byte) 1});
+        InetAddress inetAddress2 = InetAddress.getByAddress(new byte[]{(byte) 192, (byte) 168, (byte) 0, (byte) 2});
         return ClusterState.builder(ClusterName.DEFAULT)
+            .nodes(DiscoveryNodes.builder()
+                .add(new DiscoveryNode("foo", new TransportAddress(inetAddress1, 9201), Version.CURRENT))
+                .add(new DiscoveryNode("bar", new TransportAddress(inetAddress2, 9202), minNodeVersion))
+                .build())
             .metadata(Metadata.builder()
                 .indices(ImmutableOpenMap.<String, IndexMetadata>builder().putAll(indices).build())
-                .templates(ImmutableOpenMap.<String, IndexTemplateMetadata>builder().putAll(templates).build())
+                .templates(ImmutableOpenMap.<String, IndexTemplateMetadata>builder().putAll(legacyTemplates).build())
+                .indexTemplates(composableTemplates)
                 .build())
             .build();
     }
@@ -316,10 +404,14 @@ public class MlIndexAndAliasTests extends ESTestCase {
         return createIndexMetadata(indexName, true);
     }
 
-    private static IndexTemplateMetadata createIndexTemplateMetaData(String templateName, List<String> patterns) {
+    private static IndexTemplateMetadata createLegacyIndexTemplateMetaData(String templateName, List<String> patterns) {
         return IndexTemplateMetadata.builder(templateName).patterns(patterns).build();
     }
 
+    private static ComposableIndexTemplate createComposableIndexTemplateMetaData(String templateName, List<String> patterns) {
+        return new ComposableIndexTemplate.Builder().indexPatterns(patterns).build();
+    }
+
     private static IndexMetadata createIndexMetadata(String indexName, boolean withAlias) {
         Settings settings =
             Settings.builder()

+ 1 - 1
x-pack/plugin/identity-provider/src/main/java/org/elasticsearch/xpack/idp/saml/sp/SamlServiceProviderIndex.java

@@ -216,7 +216,7 @@ public class SamlServiceProviderIndex implements Closeable {
     }
 
     private boolean isTemplateUpToDate(ClusterState state) {
-        return TemplateUtils.checkTemplateExistsAndIsUpToDate(TEMPLATE_NAME, TEMPLATE_META_VERSION_KEY, state, logger);
+        return TemplateUtils.checkTemplateExistsAndIsUpToDate(TEMPLATE_NAME, TEMPLATE_META_VERSION_KEY, state, logger, null);
     }
 
     public void deleteDocument(DocumentVersion version, WriteRequest.RefreshPolicy refreshPolicy, ActionListener<DeleteResponse> listener) {

+ 2 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -1104,7 +1104,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
                 STATE_INDEX_PREFIX,
                 AnomalyDetectorsIndex.jobResultsIndexPrefix());
         for (String templateName : templateNames) {
-            allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState);
+            allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState,
+                MlIndexTemplateRegistry.COMPOSABLE_TEMPLATE_SWITCH_VERSION);
         }
 
         return allPresent;

+ 31 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java

@@ -29,6 +29,12 @@ import java.util.Map;
 
 public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
 
+    /**
+     * The version that the ML index templates were switched from legacy templates to composable templates.
+     * TODO: Change to V_7_14_0 on backport
+     */
+    public static final Version COMPOSABLE_TEMPLATE_SWITCH_VERSION = Version.V_8_0_0;
+
     private static final String ROOT_RESOURCE_PATH = "/org/elasticsearch/xpack/core/ml/";
     private static final String ANOMALY_DETECTION_PATH = ROOT_RESOURCE_PATH + "anomalydetection/";
     private static final String VERSION_PATTERN = "xpack.ml.version";
@@ -40,9 +46,8 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
 
     private static final IndexTemplateConfig ANOMALY_DETECTION_STATE_TEMPLATE = stateTemplate();
 
-    public static final IndexTemplateConfig NOTIFICATIONS_TEMPLATE = new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
-        ROOT_RESOURCE_PATH + "notifications_index_template.json", Version.CURRENT.id, VERSION_PATTERN,
-        Collections.singletonMap(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id)));
+    public static final IndexTemplateConfig NOTIFICATIONS_TEMPLATE = notificationsTemplate();
+    public static final IndexTemplateConfig NOTIFICATIONS_LEGACY_TEMPLATE = notificationsLegacyTemplate();
 
     private static final IndexTemplateConfig STATS_TEMPLATE = statsTemplate();
 
@@ -73,6 +78,28 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
             variables);
     }
 
+    private static IndexTemplateConfig notificationsTemplate() {
+        Map<String, String> variables = new HashMap<>();
+        variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id));
+        variables.put("xpack.ml.notifications.mappings", NotificationsIndex.mapping());
+
+        return new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
+            ROOT_RESOURCE_PATH + "notifications_index_template.json",
+            Version.CURRENT.id, VERSION_PATTERN,
+            variables);
+    }
+
+    private static IndexTemplateConfig notificationsLegacyTemplate() {
+        Map<String, String> variables = new HashMap<>();
+        variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id));
+        variables.put("xpack.ml.notifications.mappings", NotificationsIndex.mapping());
+
+        return new IndexTemplateConfig(NotificationsIndex.NOTIFICATIONS_INDEX,
+            ROOT_RESOURCE_PATH + "notifications_index_legacy_template.json",
+            Version.CURRENT.id, VERSION_PATTERN,
+            variables);
+    }
+
     private static IndexTemplateConfig statsTemplate() {
         Map<String, String> variables = new HashMap<>();
         variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id));
@@ -104,7 +131,7 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
     }
 
     @Override
-    protected List<IndexTemplateConfig> getLegacyTemplateConfigs() {
+    protected List<IndexTemplateConfig> getComposableTemplateConfigs() {
         return templatesToUse;
     }
 

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

@@ -139,7 +139,7 @@ public class DataFrameAnalyticsManager {
         ActionListener<Boolean> createIndexListener = ActionListener.wrap(
             aBoolean -> ElasticsearchMappings.addDocMappingIfMissing(
                     MlStatsIndex.writeAlias(),
-                    MlStatsIndex::mapping,
+                    MlStatsIndex::wrappedMapping,
                     client,
                     clusterState,
                     masterNodeTimeout,

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java

@@ -241,7 +241,7 @@ public class TrainedModelStatsService {
             ActionListener.wrap(
             r -> ElasticsearchMappings.addDocMappingIfMissing(
                 MlStatsIndex.writeAlias(),
-                MlStatsIndex::mapping,
+                MlStatsIndex::wrappedMapping,
                 client,
                 clusterState,
                 MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT,

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

@@ -493,13 +493,13 @@ public class AutodetectProcessManager implements ClusterStateListener {
         // Try adding the results doc mapping - this updates to the latest version if an old mapping is present
         ActionListener<Boolean> annotationsIndexUpdateHandler = ActionListener.wrap(
             ack -> ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobId),
-                AnomalyDetectorsIndex::resultsMapping, client, clusterState, masterNodeTimeout, resultsMappingUpdateHandler),
+                AnomalyDetectorsIndex::wrappedResultsMapping, client, clusterState, masterNodeTimeout, resultsMappingUpdateHandler),
             e -> {
                 // Due to a bug in 7.9.0 it's possible that the annotations index already has incorrect mappings
                 // and it would cause more harm than good to block jobs from opening in subsequent releases
                 logger.warn(new ParameterizedMessage("[{}] ML annotations index could not be updated with latest mappings", jobId), e);
                 ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobId),
-                    AnomalyDetectorsIndex::resultsMapping, client, clusterState, masterNodeTimeout, resultsMappingUpdateHandler);
+                    AnomalyDetectorsIndex::wrappedResultsMapping, client, clusterState, masterNodeTimeout, resultsMappingUpdateHandler);
             }
         );
 

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java

@@ -199,7 +199,7 @@ public class SnapshotUpgradeTaskExecutor extends AbstractJobPersistentTasksExecu
         ActionListener<Boolean> annotationsIndexUpdateHandler = ActionListener.wrap(
             ack -> ElasticsearchMappings.addDocMappingIfMissing(
                 AnomalyDetectorsIndex.jobResultsAliasedName(jobId),
-                AnomalyDetectorsIndex::resultsMapping,
+                AnomalyDetectorsIndex::wrappedResultsMapping,
                 client,
                 clusterState,
                 MlTasks.PERSISTENT_TASK_MASTER_NODE_TIMEOUT,
@@ -210,7 +210,7 @@ public class SnapshotUpgradeTaskExecutor extends AbstractJobPersistentTasksExecu
                 logger.warn(new ParameterizedMessage("[{}] ML annotations index could not be updated with latest mappings", jobId), e);
                 ElasticsearchMappings.addDocMappingIfMissing(
                     AnomalyDetectorsIndex.jobResultsAliasedName(jobId),
-                    AnomalyDetectorsIndex::resultsMapping,
+                    AnomalyDetectorsIndex::wrappedResultsMapping,
                     client,
                     clusterState,
                     MlTasks.PERSISTENT_TASK_MASTER_NODE_TIMEOUT,

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java

@@ -215,7 +215,7 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
         // This writes to the results index, which might need updating
         ElasticsearchMappings.addDocMappingIfMissing(
             AnomalyDetectorsIndex.jobResultsAliasedName(params.getJobId()),
-            AnomalyDetectorsIndex::resultsMapping,
+            AnomalyDetectorsIndex::wrappedResultsMapping,
             client,
             clusterState,
             PERSISTENT_TASK_MASTER_NODE_TIMEOUT,

+ 2 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java

@@ -31,6 +31,8 @@ abstract class AbstractMlAuditor<T extends AbstractAuditMessage> extends Abstrac
         super(
             new OriginSettingClient(client, ML_ORIGIN),
             NotificationsIndex.NOTIFICATIONS_INDEX,
+            MlIndexTemplateRegistry.COMPOSABLE_TEMPLATE_SWITCH_VERSION,
+            MlIndexTemplateRegistry.NOTIFICATIONS_LEGACY_TEMPLATE,
             MlIndexTemplateRegistry.NOTIFICATIONS_TEMPLATE,
             clusterService.getNodeName(),
             messageFactory,

+ 17 - 11
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java

@@ -8,7 +8,7 @@ package org.elasticsearch.xpack.ml;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.Client;
@@ -17,6 +17,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterModule;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -41,6 +42,7 @@ import static org.elasticsearch.mock.orig.Mockito.when;
 import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -54,7 +56,7 @@ public class MlIndexTemplateRegistryTests extends ESTestCase {
     private ClusterService clusterService;
     private ThreadPool threadPool;
     private Client client;
-    private ArgumentCaptor<PutIndexTemplateRequest> putIndexTemplateRequestCaptor;
+    private ArgumentCaptor<PutComposableIndexTemplateAction.Request> putIndexTemplateRequestCaptor;
 
     @Before
     public void setUpMocks() {
@@ -75,7 +77,7 @@ public class MlIndexTemplateRegistryTests extends ESTestCase {
         xContentRegistry = new NamedXContentRegistry(CollectionUtils.appendToCopy(ClusterModule.getNamedXWriteables(),
                 new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse)));
 
-        putIndexTemplateRequestCaptor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
+        putIndexTemplateRequestCaptor = ArgumentCaptor.forClass(PutComposableIndexTemplateAction.Request.class);
     }
 
     public void testStateTemplate() {
@@ -84,14 +86,16 @@ public class MlIndexTemplateRegistryTests extends ESTestCase {
 
         registry.clusterChanged(createClusterChangedEvent(nodes));
 
-        verify(client.admin().indices(), times(4)).putTemplate(putIndexTemplateRequestCaptor.capture(), anyObject());
+        verify(client, times(4))
+            .execute(same(PutComposableIndexTemplateAction.INSTANCE), putIndexTemplateRequestCaptor.capture(), anyObject());
 
-        PutIndexTemplateRequest req = putIndexTemplateRequestCaptor.getAllValues().stream()
+        PutComposableIndexTemplateAction.Request req = putIndexTemplateRequestCaptor.getAllValues().stream()
             .filter(r -> r.name().equals(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX))
             .findFirst()
             .orElseThrow(() -> new AssertionError("expected the ml state index template to be put"));
-        assertThat(req.settings().get("index.lifecycle.name"), equalTo("ml-size-based-ilm-policy"));
-        assertThat(req.settings().get("index.lifecycle.rollover_alias"), equalTo(".ml-state-write"));
+        ComposableIndexTemplate indexTemplate = req.indexTemplate();
+        assertThat(indexTemplate.template().settings().get("index.lifecycle.name"), equalTo("ml-size-based-ilm-policy"));
+        assertThat(indexTemplate.template().settings().get("index.lifecycle.rollover_alias"), equalTo(".ml-state-write"));
     }
 
     public void testStatsTemplate() {
@@ -100,14 +104,16 @@ public class MlIndexTemplateRegistryTests extends ESTestCase {
 
         registry.clusterChanged(createClusterChangedEvent(nodes));
 
-        verify(client.admin().indices(), times(4)).putTemplate(putIndexTemplateRequestCaptor.capture(), anyObject());
+        verify(client, times(4))
+            .execute(same(PutComposableIndexTemplateAction.INSTANCE), putIndexTemplateRequestCaptor.capture(), anyObject());
 
-        PutIndexTemplateRequest req = putIndexTemplateRequestCaptor.getAllValues().stream()
+        PutComposableIndexTemplateAction.Request req = putIndexTemplateRequestCaptor.getAllValues().stream()
             .filter(r -> r.name().equals(MlStatsIndex.TEMPLATE_NAME))
             .findFirst()
             .orElseThrow(() -> new AssertionError("expected the ml stats index template to be put"));
-        assertThat(req.settings().get("index.lifecycle.name"), equalTo("ml-size-based-ilm-policy"));
-        assertThat(req.settings().get("index.lifecycle.rollover_alias"), equalTo(".ml-stats-write"));
+        ComposableIndexTemplate indexTemplate = req.indexTemplate();
+        assertThat(indexTemplate.template().settings().get("index.lifecycle.name"), equalTo("ml-size-based-ilm-policy"));
+        assertThat(indexTemplate.template().settings().get("index.lifecycle.rollover_alias"), equalTo(".ml-stats-write"));
     }
 
     @SuppressWarnings("unchecked")

+ 19 - 16
x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/AbstractXPackRestTest.java

@@ -21,9 +21,6 @@ import org.elasticsearch.test.SecuritySettingsSourceField;
 import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
 import org.elasticsearch.test.rest.yaml.ClientYamlTestResponse;
 import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
-import org.elasticsearch.xpack.core.ml.MlConfigIndex;
-import org.elasticsearch.xpack.core.ml.MlMetaIndex;
-import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants;
 import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
@@ -34,7 +31,6 @@ import org.junit.After;
 import org.junit.Before;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -86,21 +82,28 @@ public class AbstractXPackRestTest extends ESClientYamlSuiteTestCase {
     /**
      * Waits for Machine Learning and Transform templates to be created by the {@link MetadataUpgrader}
      */
-    private void waitForTemplates() throws Exception {
+    private void waitForTemplates() {
         if (installTemplates()) {
-            List<String> templates = new ArrayList<>();
-            templates.addAll(
-                Arrays.asList(
-                    NotificationsIndex.NOTIFICATIONS_INDEX,
-                    AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
-                    AnomalyDetectorsIndex.jobResultsIndexPrefix(),
-                    TransformInternalIndexConstants.AUDIT_INDEX
-                ));
+            List<String> templates = Arrays.asList(
+                NotificationsIndex.NOTIFICATIONS_INDEX,
+                AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
+                AnomalyDetectorsIndex.jobResultsIndexPrefix()
+            );
 
             for (String template : templates) {
-                awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(),
-                        response -> true,
-                        () -> "Exception when waiting for [" + template + "] template to be created");
+                awaitCallApi("indices.exists_index_template", singletonMap("name", template), emptyList(),
+                    response -> true,
+                    () -> "Exception when waiting for [" + template + "] template to be created");
+            }
+
+            List<String> legacyTemplates = Collections.singletonList(
+                TransformInternalIndexConstants.AUDIT_INDEX
+            );
+
+            for (String legacyTemplate : legacyTemplates) {
+                awaitCallApi("indices.exists_template", singletonMap("name", legacyTemplate), emptyList(),
+                    response -> true,
+                    () -> "Exception when waiting for [" + legacyTemplate + "] legacy template to be created");
             }
         }
     }

+ 2 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/notifications/TransformAuditor.java

@@ -35,7 +35,7 @@ public class TransformAuditor extends AbstractAuditor<TransformAuditMessage> {
 
     public TransformAuditor(Client client, String nodeName, ClusterService clusterService) {
         super(new OriginSettingClient(client, TRANSFORM_ORIGIN), TransformInternalIndexConstants.AUDIT_INDEX,
-            TransformInternalIndexConstants.AUDIT_INDEX,
+            TransformInternalIndexConstants.AUDIT_INDEX, null,
             () -> {
                 try {
                     IndexTemplateMetadata templateMeta = TransformInternalIndex.getAuditIndexTemplateMetadata();
@@ -65,7 +65,7 @@ public class TransformAuditor extends AbstractAuditor<TransformAuditMessage> {
                     return null;
                 }
             },
-            nodeName, TransformAuditMessage::new, clusterService);
+            () -> null, nodeName, TransformAuditMessage::new, clusterService);
         clusterService.addListener(event -> {
             if (event.metadataChanged()) {
                 isResetMode = TransformMetadata.getTransformMetadata(event.state()).isResetMode();

+ 3 - 1
x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlConfigIndexMappingsFullClusterRestartIT.java

@@ -48,7 +48,9 @@ public class MlConfigIndexMappingsFullClusterRestartIT extends AbstractFullClust
         List<String> templatesToWaitFor = (isRunningAgainstOldCluster() && getOldClusterVersion().before(Version.V_7_12_0))
             ? XPackRestTestConstants.ML_POST_V660_TEMPLATES
             : XPackRestTestConstants.ML_POST_V7120_TEMPLATES;
-        XPackRestTestHelper.waitForTemplates(client(), templatesToWaitFor);
+        boolean clusterUnderstandsComposableTemplates =
+            isRunningAgainstOldCluster() == false || getOldClusterVersion().onOrAfter(Version.V_7_8_0);
+        XPackRestTestHelper.waitForTemplates(client(), templatesToWaitFor, clusterUnderstandsComposableTemplates);
     }
 
     public void testMlConfigIndexMappingsAfterMigration() throws Exception {

+ 3 - 1
x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java

@@ -60,7 +60,9 @@ public class MlMigrationFullClusterRestartIT extends AbstractFullClusterRestartT
         List<String> templatesToWaitFor = (isRunningAgainstOldCluster() && getOldClusterVersion().before(Version.V_7_12_0))
             ? XPackRestTestConstants.ML_POST_V660_TEMPLATES
             : XPackRestTestConstants.ML_POST_V7120_TEMPLATES;
-        XPackRestTestHelper.waitForTemplates(client(), templatesToWaitFor);
+        boolean clusterUnderstandsComposableTemplates =
+            isRunningAgainstOldCluster() == false || getOldClusterVersion().onOrAfter(Version.V_7_8_0);
+        XPackRestTestHelper.waitForTemplates(client(), templatesToWaitFor, clusterUnderstandsComposableTemplates);
     }
 
     private void createTestIndex() throws IOException {

+ 12 - 5
x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java

@@ -9,6 +9,7 @@ package org.elasticsearch.upgrades;
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
 import org.apache.lucene.util.TimeUnits;
+import org.elasticsearch.Version;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.common.settings.Settings;
@@ -33,14 +34,20 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 public class UpgradeClusterClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
 
     /**
-     * Waits for the Machine Learning templates to be created by {@link org.elasticsearch.plugins.MetadataUpgrader}
+     * Waits for the Machine Learning templates to be created by {@link org.elasticsearch.plugins.MetadataUpgrader}.
+     * Only do this on the old cluster.  Users won't necessarily wait for templates to be upgraded during rolling
+     * upgrades, so we cannot wait within the test framework, or we could miss production bugs.
      */
     @Before
     public void waitForTemplates() throws Exception {
-        try {
-            XPackRestTestHelper.waitForTemplates(client(), XPackRestTestConstants.ML_POST_V7120_TEMPLATES);
-        } catch (AssertionError e) {
-            throw new AssertionError("Failure in test setup: Failed to initialize ML index templates", e);
+        if (AbstractUpgradeTestCase.CLUSTER_TYPE == AbstractUpgradeTestCase.ClusterType.OLD) {
+            try {
+                boolean clusterUnderstandsComposableTemplates = AbstractUpgradeTestCase.UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_8_0);
+                XPackRestTestHelper.waitForTemplates(client(), XPackRestTestConstants.ML_POST_V7120_TEMPLATES,
+                    clusterUnderstandsComposableTemplates);
+            } catch (AssertionError e) {
+                throw new AssertionError("Failure in test setup: Failed to initialize ML index templates", e);
+            }
         }
     }
 

+ 84 - 19
x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/IndexMappingTemplateAsserter.java

@@ -18,12 +18,14 @@ import java.io.IOException;
 import java.util.AbstractMap;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -42,6 +44,7 @@ import static org.junit.Assert.fail;
  * These assertions are usually part of upgrade testing.
  */
 public class IndexMappingTemplateAsserter {
+
     /**
      * Assert that the mappings of the ml indices are the same as in the
      * templates. If different this is either a consequence of an unintended
@@ -52,9 +55,8 @@ public class IndexMappingTemplateAsserter {
      * effect of a different test running in the cluster.
      *
      * @param client The rest client
-     * @throws IOException On error
      */
-    public static void assertMlMappingsMatchTemplates(RestClient client) throws IOException {
+    public static void assertMlMappingsMatchTemplates(RestClient client) throws Exception {
         // Excluding those from stats index as some have been renamed and other removed.
         // These exceptions are necessary for Full Cluster Restart tests where the upgrade version is < 7.x
         Set<String> statsIndexException = new HashSet<>();
@@ -69,10 +71,10 @@ public class IndexMappingTemplateAsserter {
         Set<String> notificationsIndexExceptions = new HashSet<>();
         notificationsIndexExceptions.add("properties.message.fields.raw.ignore_above");
 
-        assertLegacyTemplateMatchesIndexMappings(client, ".ml-stats", ".ml-stats-000001", true, statsIndexException, false);
-        assertLegacyTemplateMatchesIndexMappings(client, ".ml-state", ".ml-state-000001", true, Collections.emptySet(), false);
+        assertComposableTemplateMatchesIndexMappings(client, ".ml-stats", ".ml-stats-000001", true, statsIndexException, false);
+        assertComposableTemplateMatchesIndexMappings(client, ".ml-state", ".ml-state-000001", true, Collections.emptySet(), false);
         // Depending on the order Full Cluster restart tests are run there may not be an notifications index yet
-        assertLegacyTemplateMatchesIndexMappings(client,
+        assertComposableTemplateMatchesIndexMappings(client,
             ".ml-notifications-000001", ".ml-notifications-000001", true, notificationsIndexExceptions, false);
         // .ml-annotations-6 does not use a template
         // .ml-anomalies-shared uses a template but will have dynamically updated mappings as new jobs are opened
@@ -87,8 +89,8 @@ public class IndexMappingTemplateAsserter {
     }
 
     /**
-     * Compares the mappings from the template and the index and asserts they
-     * are the same. The assertion error message details the differences in
+     * Compares the mappings from the legacy template and the index and asserts
+     * they are the same. The assertion error message details the differences in
      * the mappings.
      *
      * The Mappings, which are maps of maps, are flattened with the keys built
@@ -113,7 +115,6 @@ public class IndexMappingTemplateAsserter {
      * @param exceptions                    List of keys to ignore in the index mappings.
      *                                      Each key is a '.' separated path.
      * @param allowSystemIndexWarnings      Whether deprecation warnings for system index access should be allowed/expected.
-     * @throws IOException                  Yes
      */
     @SuppressWarnings("unchecked")
     public static void assertLegacyTemplateMatchesIndexMappings(RestClient client,
@@ -121,17 +122,87 @@ public class IndexMappingTemplateAsserter {
                                                                 String indexName,
                                                                 boolean notAnErrorIfIndexDoesNotExist,
                                                                 Set<String> exceptions,
-                                                                boolean allowSystemIndexWarnings) throws IOException {
+                                                                boolean allowSystemIndexWarnings) throws Exception {
+
+        AtomicReference<Response> templateResponse = new AtomicReference<>();
 
-        Request getTemplate = new Request("GET", "_template/" + templateName);
-        Response templateResponse = client.performRequest(getTemplate);
-        assertEquals("missing template [" + templateName + "]", 200, templateResponse.getStatusLine().getStatusCode());
+        ESRestTestCase.assertBusy(() -> {
+            Request getTemplate = new Request("GET", "_template/" + templateName);
+            templateResponse.set(client.performRequest(getTemplate));
+            assertEquals("missing template [" + templateName + "]", 200, templateResponse.get().getStatusLine().getStatusCode());
+        });
 
         Map<String, Object> templateMappings = (Map<String, Object>) XContentMapValues.extractValue(
-            ESRestTestCase.entityAsMap(templateResponse),
+            ESRestTestCase.entityAsMap(templateResponse.get()),
             templateName, "mappings");
         assertNotNull(templateMappings);
 
+        assertTemplateMatchesIndexMappingsCommon(client, templateName, templateMappings,
+            indexName, notAnErrorIfIndexDoesNotExist, exceptions, allowSystemIndexWarnings);
+    }
+
+    /**
+     * Compares the mappings from the composable template and the index and asserts
+     * they are the same. The assertion error message details the differences in
+     * the mappings.
+     *
+     * The Mappings, which are maps of maps, are flattened with the keys built
+     * from the keys of the sub-maps appended to the parent key.
+     * This makes diffing the 2 maps easier and diffs more comprehensible.
+     *
+     * The _meta field is not compared as it contains version numbers that
+     * change even when the mappings don't.
+     *
+     * Mistakes happen and some indices may be stuck with the incorrect mappings
+     * that cannot be fixed without re-index. In this case use the {@code exceptions}
+     * parameter to filter out fields in the index mapping that are not in the
+     * template. Each exception should be a '.' separated path to the value
+     * e.g. {@code properties.analysis.analysis_field.type}.
+     *
+     * @param client                        The rest client to use
+     * @param templateName                  The template
+     * @param indexName                     The index
+     * @param notAnErrorIfIndexDoesNotExist The index may or may not have been created from
+     *                                      the template. If {@code true} then the missing
+     *                                      index does not cause an error
+     * @param exceptions                    List of keys to ignore in the index mappings.
+     *                                      Each key is a '.' separated path.
+     * @param allowSystemIndexWarnings      Whether deprecation warnings for system index access should be allowed/expected.
+     */
+    @SuppressWarnings("unchecked")
+    public static void assertComposableTemplateMatchesIndexMappings(RestClient client,
+                                                                    String templateName,
+                                                                    String indexName,
+                                                                    boolean notAnErrorIfIndexDoesNotExist,
+                                                                    Set<String> exceptions,
+                                                                    boolean allowSystemIndexWarnings) throws Exception {
+
+        AtomicReference<Response> templateResponse = new AtomicReference<>();
+
+        ESRestTestCase.assertBusy(() -> {
+            Request getTemplate = new Request("GET", "_index_template/" + templateName);
+            templateResponse.set(client.performRequest(getTemplate));
+            assertEquals("missing template [" + templateName + "]", 200, templateResponse.get().getStatusLine().getStatusCode());
+        });
+
+        Map<String, Object> templateMappings = ((List<Map<String, Object>>) XContentMapValues.extractValue(
+            ESRestTestCase.entityAsMap(templateResponse.get()),
+            "index_templates", "index_template", "template", "mappings")).get(0);
+        assertNotNull(templateMappings);
+
+        assertTemplateMatchesIndexMappingsCommon(client, templateName, templateMappings, indexName, notAnErrorIfIndexDoesNotExist,
+            exceptions, allowSystemIndexWarnings);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static void assertTemplateMatchesIndexMappingsCommon(RestClient client,
+                                                                 String templateName,
+                                                                 Map<String, Object> templateMappings,
+                                                                 String indexName,
+                                                                 boolean notAnErrorIfIndexDoesNotExist,
+                                                                 Set<String> exceptions,
+                                                                 boolean allowSystemIndexWarnings) throws IOException {
+
         Request getIndexMapping = new Request("GET", indexName + "/_mapping");
         if (allowSystemIndexWarnings) {
             final String systemIndexWarning = "this request accesses system indices: [" + indexName + "], but in a future major version, " +
@@ -237,12 +308,6 @@ public class IndexMappingTemplateAsserter {
         }
     }
 
-    public static void assertLegacyTemplateMatchesIndexMappings(RestClient client,
-                                                                String templateName,
-                                                                String indexName) throws IOException {
-        assertLegacyTemplateMatchesIndexMappings(client, templateName, indexName, false, Collections.emptySet(), false);
-    }
-
     private static boolean areBooleanObjectsAndEqual(Object a, Object b) {
         Boolean left;
         Boolean right;

+ 32 - 7
x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java

@@ -13,6 +13,7 @@ import org.elasticsearch.client.RestClient;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,7 +39,9 @@ public final class XPackRestTestHelper {
      * @param expectedTemplates  Names of the templates to wait for
      * @throws InterruptedException If the wait is interrupted
      */
-    public static void waitForTemplates(RestClient client, List<String> expectedTemplates) throws Exception {
+    @SuppressWarnings("unchecked")
+    public static void waitForTemplates(RestClient client, List<String> expectedTemplates, boolean clusterUnderstandsComposableTemplates)
+        throws Exception {
         AtomicReference<Version> masterNodeVersion = new AtomicReference<>();
 
         assertBusy(() -> {
@@ -56,26 +59,48 @@ public final class XPackRestTestHelper {
             fail("No master elected");
         });
 
+        // TODO: legacy support can be removed once all X-Pack plugins use only composable
+        //       templates in the oldest version we test upgrades from
         assertBusy(() -> {
-            final Request request = new Request("GET", "_template");
-            request.addParameter("error_trace", "true");
-
-            String string = EntityUtils.toString(client.performRequest(request).getEntity());
-            Map<String, Object> response = XContentHelper.convertToMap(JsonXContent.jsonXContent, string, false);
+            Map<String, Object> response;
+            if (clusterUnderstandsComposableTemplates) {
+                final Request request = new Request("GET", "_index_template");
+                request.addParameter("error_trace", "true");
 
+                String string = EntityUtils.toString(client.performRequest(request).getEntity());
+                List<Map<String, Object>> templateList = (List<Map<String, Object>>) XContentHelper.convertToMap(JsonXContent.jsonXContent,
+                    string, false).get("index_templates");
+                response = templateList.stream().collect(Collectors.toMap(m -> (String) m.get("name"), m -> m.get("index_template")));
+            } else {
+                response = Collections.emptyMap();
+            }
             final Set<String> templates = new TreeSet<>(response.keySet());
 
+            final Request legacyRequest = new Request("GET", "_template");
+            legacyRequest.addParameter("error_trace", "true");
+
+            String string = EntityUtils.toString(client.performRequest(legacyRequest).getEntity());
+            Map<String, Object> legacyResponse = XContentHelper.convertToMap(JsonXContent.jsonXContent, string, false);
+
+            final Set<String> legacyTemplates = new TreeSet<>(legacyResponse.keySet());
+
             final List<String> missingTemplates = expectedTemplates.stream()
                 .filter(each -> templates.contains(each) == false)
+                .filter(each -> legacyTemplates.contains(each) == false)
                 .collect(Collectors.toList());
 
             // While it's possible to use a Hamcrest matcher for this, the failure is much less legible.
             if (missingTemplates.isEmpty() == false) {
-                fail("Some expected templates are missing: " + missingTemplates + ". The templates that exist are: " + templates + "");
+                fail("Some expected templates are missing: " + missingTemplates
+                    + ". The composable templates that exist are: " + templates
+                    + ". The legacy templates that exist are: " + legacyTemplates);
             }
 
             expectedTemplates.forEach(template -> {
                 Map<?, ?> templateDefinition = (Map<?, ?>) response.get(template);
+                if (templateDefinition == null) {
+                    templateDefinition = (Map<?, ?>) legacyResponse.get(template);
+                }
                 assertThat(
                     "Template [" + template + "] has unexpected version",
                     Version.fromId((Integer) templateDefinition.get("version")),