Browse Source

Merge pull request ESQL-1386 from elastic/main

🤖 ESQL: Merge upstream
elasticsearchmachine 2 years ago
parent
commit
dde592c82c
33 changed files with 1778 additions and 182 deletions
  1. 6 0
      docs/changelog/97355.yaml
  2. 5 0
      docs/changelog/97380.yaml
  3. 5 0
      docs/changelog/97387.yaml
  4. 5 0
      docs/changelog/97401.yaml
  5. 1 1
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/TDigestTests.java
  6. 22 3
      modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java
  7. 4 0
      modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_logs_ecs_mappings.yml
  8. 77 0
      modules/mapper-extras/src/yamlRestTest/resources/rest-api-spec/test/match_only_text/10_basic.yml
  9. 16 1
      server/src/main/java/org/elasticsearch/index/IndexMode.java
  10. 6 1
      server/src/main/java/org/elasticsearch/index/mapper/MappingParser.java
  11. 9 5
      server/src/main/java/org/elasticsearch/search/profile/query/CollectorResult.java
  12. 84 16
      server/src/main/java/org/elasticsearch/search/profile/query/ProfileCollectorManager.java
  13. 1 1
      server/src/test/java/org/elasticsearch/search/dfs/DfsPhaseTests.java
  14. 161 32
      server/src/test/java/org/elasticsearch/search/profile/query/ProfileCollectorManagerTests.java
  15. 0 1
      x-pack/plugin/core/src/main/resources/ecs-dynamic-mappings.json
  16. 4 0
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-events.json
  17. 4 0
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-executables.json
  18. 4 0
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-hosts.json
  19. 4 0
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-metrics.json
  20. 4 0
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-stackframes.json
  21. 4 0
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-stacktraces.json
  22. 4 0
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-symbols.json
  23. 18 1
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/index-template/profiling-returnpads-private.json
  24. 4 0
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/index-template/profiling-sq-executables.json
  25. 4 0
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/index-template/profiling-sq-leafframes.json
  26. 218 0
      x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/AbstractProfilingPersistenceManager.java
  27. 241 0
      x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingDataStreamManager.java
  28. 283 99
      x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingIndexManager.java
  29. 36 10
      x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingIndexTemplateRegistry.java
  30. 6 1
      x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingPlugin.java
  31. 354 0
      x-pack/plugin/profiler/src/test/java/org/elasticsearch/xpack/profiler/ProfilingDataStreamManagerTests.java
  32. 181 10
      x-pack/plugin/profiler/src/test/java/org/elasticsearch/xpack/profiler/ProfilingIndexManagerTests.java
  33. 3 0
      x-pack/plugin/vector-tile/src/main/java/org/elasticsearch/xpack/vectortile/rest/RestVectorTileAction.java

+ 6 - 0
docs/changelog/97355.yaml

@@ -0,0 +1,6 @@
+pr: 97355
+summary: Fix mapping parsing logic to determine synthetic source is active
+area: "Mapping"
+type: bug
+issues:
+ - 97320

+ 5 - 0
docs/changelog/97380.yaml

@@ -0,0 +1,5 @@
+pr: 97380
+summary: "[Profiling] Add initial support for upgrades"
+area: Application
+type: enhancement
+issues: []

+ 5 - 0
docs/changelog/97387.yaml

@@ -0,0 +1,5 @@
+pr: 97387
+summary: '`ProfileCollectorManager` to support child profile collectors'
+area: Search
+type: enhancement
+issues: []

+ 5 - 0
docs/changelog/97401.yaml

@@ -0,0 +1,5 @@
+pr: 97401
+summary: Accept timestamp as object at root level
+area: Data streams
+type: bug
+issues: []

+ 1 - 1
libs/tdigest/src/test/java/org/elasticsearch/tdigest/TDigestTests.java

@@ -337,7 +337,7 @@ public abstract class TDigestTests extends ESTestCase {
 
         // for this value of the compression, the tree shouldn't have merged any node
         assertEquals(digest.centroids().size(), values.size());
-        for (double q : new double[] { 0, 1e-10, r.nextDouble(), 0.5, 1 - 1e-10, 1 }) {
+        for (double q : new double[] { 0, 1e-10, 0.5, 1 - 1e-10, 1 }) {
             double q1 = Dist.quantile(q, values);
             double q2 = digest.quantile(q);
             assertEquals(String.valueOf(q), q1, q2, q1);

+ 22 - 3
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

@@ -625,15 +625,34 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
                 .put(DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING.getKey(), ByteSizeValue.ofMb(5))
         );
 
+        // rollover to assert the second generation is configured with the new setting values (note that the first index _might_ pick up
+        // the new settings as well if the data stream lifecycle runs often enough - every second in tests - and the index has not yet been
+        // forcemerged)
+
+        indexDocs(dataStreamName, 1);
+
+        // let's allow one rollover to go through
         assertBusy(() -> {
-            GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(firstGenerationIndex).includeDefaults(true);
+            GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
+            GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
+                .actionGet();
+            assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
+            assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
+            List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
+            assertThat(backingIndices.size(), equalTo(3));
+        });
+
+        String secondGenerationIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1L);
+        // check the 2nd generation index picked up the new setting values
+        assertBusy(() -> {
+            GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(secondGenerationIndex).includeDefaults(true);
             GetSettingsResponse getSettingsResponse = client().execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet();
             assertThat(
-                getSettingsResponse.getSetting(firstGenerationIndex, MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey()),
+                getSettingsResponse.getSetting(secondGenerationIndex, MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey()),
                 is("5")
             );
             assertThat(
-                getSettingsResponse.getSetting(firstGenerationIndex, MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey()),
+                getSettingsResponse.getSetting(secondGenerationIndex, MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey()),
                 is(ByteSizeValue.ofMb(5).getStringRep())
             );
         });

+ 4 - 0
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_logs_ecs_mappings.yml

@@ -197,6 +197,7 @@ Test general mockup ECS mappings:
           {
             "start_timestamp": "not a date",
             "start-timestamp": "not a date",
+            "timestamp.us": 1688550340718000,
             "test": "mockup-ecs-log",
             "registry": {
               "data": {
@@ -234,6 +235,7 @@ Test general mockup ECS mappings:
                 "path": "/path/to/my/file",
                 "target_path": "/path/to/my/file"
               },
+              "code_signature.timestamp": "2023-07-05",
               "registry.data.strings": ["C:\\rta\\red_ttp\\bin\\myapp.exe"]
             },
             "error": {
@@ -376,6 +378,8 @@ Test general mockup ECS mappings:
   - match: { .$idx0name.mappings.properties.start_timestamp.type: "date" }
   # testing the default mapping of string input fields to keyword if not matching any pattern
   - match: { .$idx0name.mappings.properties.start-timestamp.type: "keyword" }
+  - match: { .$idx0name.mappings.properties.timestamp.properties.us.type: "long" }
+  - match: { .$idx0name.mappings.properties.parent.properties.code_signature.properties.timestamp.type: "date" }
   - match: { .$idx0name.mappings.properties.vulnerability.properties.score.properties.base.type: "float" }
   - match: { .$idx0name.mappings.properties.vulnerability.properties.score.properties.temporal.type: "float" }
   - match: { .$idx0name.mappings.properties.vulnerability.properties.score.properties.version.type: "keyword" }

+ 77 - 0
modules/mapper-extras/src/yamlRestTest/resources/rest-api-spec/test/match_only_text/10_basic.yml

@@ -274,3 +274,80 @@ setup:
   - match: { hits.total.value: 1 }
   - match: { hits.hits.0._source.foo: "The Apache Software Foundation manages many projects including Lucene" }
   - match: { hits.hits.0.highlight.foo.0: "The Apache Software Foundation manages <em>many</em> projects including Lucene" }
+
+---
+synthetic_source:
+  - skip:
+      version: " - 8.3.99"
+      reason: synthetic source introduced in 8.4.0
+
+  - do:
+      indices.create:
+        index: synthetic_source_test
+        body:
+          mappings:
+            _source:
+              mode: synthetic
+            properties:
+              foo:
+                type: match_only_text
+
+  - do:
+      index:
+        index: synthetic_source_test
+        id:    "1"
+        refresh: true
+        body:
+          foo: "Apache Lucene powers Elasticsearch"
+
+  - do:
+      search:
+        index: synthetic_source_test
+  - match: { "hits.total.value": 1 }
+  - match:
+      hits.hits.0._source:
+        foo: "Apache Lucene powers Elasticsearch"
+
+---
+tsdb:
+  - skip:
+      version: " - 8.9.99"
+      reason: bug fixed in 8.10.0
+
+  - do:
+      indices.create:
+        index: tsdb_test
+        body:
+          settings:
+            index:
+              mode: time_series
+              routing_path: [ dimension ]
+              time_series:
+                start_time: 2000-01-01T00:00:00Z
+                end_time: 2099-12-31T23:59:59Z
+          mappings:
+            properties:
+              dimension:
+                type: keyword
+                time_series_dimension: true
+              foo:
+                type: match_only_text
+
+  - do:
+      index:
+        index: tsdb_test
+        refresh: true
+        body:
+          "@timestamp": "2000-01-01T00:00:00Z"
+          dimension: "a"
+          foo: "Apache Lucene powers Elasticsearch"
+
+  - do:
+      search:
+        index: tsdb_test
+  - match: { "hits.total.value": 1 }
+  - match:
+      hits.hits.0._source:
+        "@timestamp" : "2000-01-01T00:00:00.000Z"
+        "dimension" : "a"
+        foo: "Apache Lucene powers Elasticsearch"

+ 16 - 1
server/src/main/java/org/elasticsearch/index/IndexMode.java

@@ -114,6 +114,11 @@ public enum IndexMode {
 
         @Override
         public void validateSourceFieldMapper(SourceFieldMapper sourceFieldMapper) {}
+
+        @Override
+        public boolean isSyntheticSourceEnabled() {
+            return false;
+        }
     },
     TIME_SERIES("time_series") {
         @Override
@@ -207,6 +212,11 @@ public enum IndexMode {
                 throw new IllegalArgumentException("time series indices only support synthetic source");
             }
         }
+
+        @Override
+        public boolean isSyntheticSourceEnabled() {
+            return true;
+        }
     };
 
     protected static String tsdbMode() {
@@ -300,7 +310,7 @@ public enum IndexMode {
 
     /**
      * @return the time range based on the provided index metadata and index mode implementation.
-     *         Otherwise <code>null</code> is returned.
+     * Otherwise <code>null</code> is returned.
      */
     @Nullable
     public abstract TimestampBounds getTimestampBound(IndexMetadata indexMetadata);
@@ -327,6 +337,11 @@ public enum IndexMode {
      */
     public abstract void validateSourceFieldMapper(SourceFieldMapper sourceFieldMapper);
 
+    /**
+     * @return whether synthetic source is the only allowed source mode.
+     */
+    public abstract boolean isSyntheticSourceEnabled();
+
     /**
      * Parse a string into an {@link IndexMode}.
      */

+ 6 - 1
server/src/main/java/org/elasticsearch/index/mapper/MappingParser.java

@@ -11,6 +11,7 @@ package org.elasticsearch.index.mapper;
 import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.xcontent.XContentType;
 
 import java.util.Collections;
@@ -103,7 +104,7 @@ public final class MappingParser {
 
         Map<Class<? extends MetadataFieldMapper>, MetadataFieldMapper> metadataMappers = metadataMappersSupplier.get();
         Map<String, Object> meta = null;
-        boolean isSourceSynthetic = false;
+        boolean isSourceSynthetic = mappingParserContext.getIndexSettings().getMode().isSyntheticSourceEnabled();
 
         Iterator<Map.Entry<String, Object>> iterator = mapping.entrySet().iterator();
         while (iterator.hasNext()) {
@@ -124,6 +125,10 @@ public final class MappingParser {
                 metadataMappers.put(metadataFieldMapper.getClass(), metadataFieldMapper);
                 assert fieldNodeMap.isEmpty();
                 if (metadataFieldMapper instanceof SourceFieldMapper sfm) {
+                    // Validation in other places should have failed first
+                    assert sfm.isSynthetic()
+                        || (sfm.isSynthetic() == false && mappingParserContext.getIndexSettings().getMode() != IndexMode.TIME_SERIES)
+                        : "synthetic source can't be disabled in a time series index";
                     isSourceSynthetic = sfm.isSynthetic();
                 }
             }

+ 9 - 5
server/src/main/java/org/elasticsearch/search/profile/query/CollectorResult.java

@@ -64,10 +64,14 @@ public class CollectorResult extends ProfilerCollectorResult implements ToXConte
         out.writeString(getName());
         out.writeString(getReason());
         out.writeLong(getTime());
-        out.writeList(getCollectorResults());
+        out.writeList(getChildrenResults());
     }
 
-    public List<CollectorResult> getCollectorResults() {
+    /**
+     * Exposes a list of children collector results. Same as {@link ProfilerCollectorResult#getProfiledChildren()} with each
+     * item in the list being cast to a {@link CollectorResult}
+     */
+    public List<CollectorResult> getChildrenResults() {
         return super.getProfiledChildren().stream().map(profilerCollectorResult -> (CollectorResult) profilerCollectorResult).toList();
     }
 
@@ -80,12 +84,12 @@ public class CollectorResult extends ProfilerCollectorResult implements ToXConte
         return getName().equals(other.getName())
             && getReason().equals(other.getReason())
             && getTime() == other.getTime()
-            && getCollectorResults().equals(other.getCollectorResults());
+            && getChildrenResults().equals(other.getChildrenResults());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(getName(), getReason(), getTime(), getCollectorResults());
+        return Objects.hash(getName(), getReason(), getTime(), getChildrenResults());
     }
 
     @Override
@@ -105,7 +109,7 @@ public class CollectorResult extends ProfilerCollectorResult implements ToXConte
 
         if (getProfiledChildren().isEmpty() == false) {
             builder = builder.startArray(CHILDREN.getPreferredName());
-            for (CollectorResult child : getCollectorResults()) {
+            for (CollectorResult child : getChildrenResults()) {
                 builder = child.toXContent(builder, params);
             }
             builder = builder.endArray();

+ 84 - 16
server/src/main/java/org/elasticsearch/search/profile/query/ProfileCollectorManager.java

@@ -8,54 +8,122 @@
 
 package org.elasticsearch.search.profile.query;
 
+import org.apache.lucene.sandbox.search.ProfilerCollector;
 import org.apache.lucene.search.Collector;
 import org.apache.lucene.search.CollectorManager;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * A {@link CollectorManager} that takes another CollectorManager as input and wraps all Collectors generated by it
  * in an {@link InternalProfileCollector}. It delegates all the profiling to the generated collectors via {@link #getCollectorTree()}
- * and joins them up when its {@link #reduce} method is called. The profile result can
+ * and joins the different collector trees together when its {@link #reduce} method is called.
+ * Supports optionally providing sub-collector managers for top docs as well as aggs collection, so that each
+ * {@link InternalProfileCollector} created is provided with the corresponding sub-collectors that are children of the top-level collector.
+ * @param <T> the return type of the wrapped collector manager, which the reduce method returns.
  */
 public final class ProfileCollectorManager<T> implements CollectorManager<InternalProfileCollector, T> {
 
-    private final CollectorManager<Collector, T> collectorManager;
+    private final CollectorManager<? extends Collector, T> collectorManager;
     private final String reason;
+    private final ProfileCollectorManager<?> topDocsSubCollectorManager;
+    private final ProfileCollectorManager<?> aggsSubCollectorManager;
+    // this is a bit of a hack: it allows us to retrieve the last collector that newCollector has returned for sub-collector managers,
+    // so that we can provide them to InternalProfileCollector's constructor as children. This is fine as newCollector does not get called
+    // concurrently, but rather in advance before parallelizing the collection
+    private InternalProfileCollector profileCollector;
+
     private CollectorResult collectorTree;
 
-    @SuppressWarnings("unchecked")
     public ProfileCollectorManager(CollectorManager<? extends Collector, T> collectorManager, String reason) {
-        this.collectorManager = (CollectorManager<Collector, T>) collectorManager;
+        this(collectorManager, reason, null, null);
+    }
+
+    public ProfileCollectorManager(
+        CollectorManager<? extends Collector, T> collectorManager,
+        String reason,
+        ProfileCollectorManager<?> topDocsSubCollectorManager,
+        ProfileCollectorManager<?> aggsSubCollectorManager
+    ) {
+        this.collectorManager = collectorManager;
         this.reason = reason;
+        assert assertSubCollectorManagers() : "top docs manager is null while aggs manager isn't";
+        this.topDocsSubCollectorManager = topDocsSubCollectorManager;
+        this.aggsSubCollectorManager = aggsSubCollectorManager;
+    }
+
+    private boolean assertSubCollectorManagers() {
+        if (aggsSubCollectorManager != null) {
+            return topDocsSubCollectorManager != null;
+        }
+        return true;
     }
 
     @Override
     public InternalProfileCollector newCollector() throws IOException {
-        return new InternalProfileCollector(collectorManager.newCollector(), reason);
+        Collector collector = collectorManager.newCollector();
+        if (aggsSubCollectorManager == null && topDocsSubCollectorManager == null) {
+            profileCollector = new InternalProfileCollector(collector, reason);
+        } else if (aggsSubCollectorManager == null) {
+            assert topDocsSubCollectorManager.profileCollector != null;
+            profileCollector = new InternalProfileCollector(collector, reason, topDocsSubCollectorManager.profileCollector);
+        } else {
+            assert topDocsSubCollectorManager.profileCollector != null && aggsSubCollectorManager.profileCollector != null;
+            profileCollector = new InternalProfileCollector(
+                collector,
+                reason,
+                topDocsSubCollectorManager.profileCollector,
+                aggsSubCollectorManager.profileCollector
+            );
+        }
+        return profileCollector;
     }
 
+    @Override
     public T reduce(Collection<InternalProfileCollector> profileCollectors) throws IOException {
         assert profileCollectors.size() > 0 : "at least one collector expected";
-        List<Collector> unwrapped = profileCollectors.stream()
-            .map(InternalProfileCollector::getWrappedCollector)
-            .collect(Collectors.toList());
-        T returnValue = collectorManager.reduce(unwrapped);
-
-        List<CollectorResult> resultsPerProfiler = profileCollectors.stream()
-            .map(ipc -> ipc.getCollectorTree())
-            .collect(Collectors.toList());
+        List<Collector> unwrapped = profileCollectors.stream().map(InternalProfileCollector::getWrappedCollector).toList();
+        @SuppressWarnings("unchecked")
+        CollectorManager<Collector, T> cm = (CollectorManager<Collector, T>) collectorManager;
+        T returnValue = cm.reduce(unwrapped);
 
+        List<CollectorResult> resultsPerProfiler = profileCollectors.stream().map(InternalProfileCollector::getCollectorTree).toList();
         long totalTime = resultsPerProfiler.stream().map(CollectorResult::getTime).reduce(0L, Long::sum);
         String collectorName = resultsPerProfiler.get(0).getName();
-        this.collectorTree = new CollectorResult(collectorName, reason, totalTime, Collections.emptyList());
+        assert profileCollectors.stream().map(ProfilerCollector::getReason).allMatch(reason::equals);
+        assert profileCollectors.stream().map(ProfilerCollector::getName).allMatch(collectorName::equals);
+        assert assertChildrenSize(resultsPerProfiler);
+
+        List<CollectorResult> childrenResults = new ArrayList<>();
+        // for the children collector managers, we rely on the chain on reduce calls to make their collector results available
+        if (topDocsSubCollectorManager != null) {
+            childrenResults.add(topDocsSubCollectorManager.getCollectorTree());
+        }
+        if (aggsSubCollectorManager != null) {
+            childrenResults.add(aggsSubCollectorManager.getCollectorTree());
+        }
+        this.collectorTree = new CollectorResult(collectorName, reason, totalTime, childrenResults);
+
         return returnValue;
     }
 
+    private boolean assertChildrenSize(List<CollectorResult> resultsPerProfiler) {
+        int expectedSize = 0;
+        if (topDocsSubCollectorManager != null) {
+            expectedSize++;
+        }
+        if (aggsSubCollectorManager != null) {
+            expectedSize++;
+        }
+        final int expectedChildrenSize = expectedSize;
+        return resultsPerProfiler.stream()
+            .map(collectorResult -> collectorResult.getChildrenResults().size())
+            .allMatch(integer -> integer == expectedChildrenSize);
+    }
+
     public CollectorResult getCollectorTree() {
         if (this.collectorTree == null) {
             throw new IllegalStateException("A collectorTree hasn't been set yet. Call reduce() before attempting to retrieve it");

+ 1 - 1
server/src/test/java/org/elasticsearch/search/dfs/DfsPhaseTests.java

@@ -101,7 +101,7 @@ public class DfsPhaseTests extends ESTestCase {
             assertEquals("SimpleTopScoreDocCollector", (collectorResult.getName()));
             assertEquals("search_top_hits", (collectorResult.getReason()));
             assertTrue(collectorResult.getTime() > 0);
-            List<CollectorResult> children = collectorResult.getCollectorResults();
+            List<CollectorResult> children = collectorResult.getChildrenResults();
             if (children.size() > 0) {
                 long totalTime = 0L;
                 for (CollectorResult child : children) {

+ 161 - 32
server/src/test/java/org/elasticsearch/search/profile/query/ProfileCollectorManagerTests.java

@@ -11,13 +11,19 @@ package org.elasticsearch.search.profile.query;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.sandbox.search.ProfilerCollectorResult;
+import org.apache.lucene.search.Collector;
 import org.apache.lucene.search.CollectorManager;
+import org.apache.lucene.search.FilterCollector;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.LeafCollector;
 import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.MultiCollectorManager;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.search.TopScoreDocCollector;
-import org.apache.lucene.search.similarities.BM25Similarity;
+import org.apache.lucene.search.TotalHitCountCollectorManager;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.tests.index.RandomIndexWriter;
 import org.apache.lucene.tests.search.DummyTotalHitCountCollector;
@@ -27,11 +33,15 @@ import org.elasticsearch.test.ESTestCase;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 public class ProfileCollectorManagerTests extends ESTestCase {
 
+    private Directory directory;
+    private int numDocs;
+    private DirectoryReader reader;
+    private IndexSearcher searcher;
+
     private static class TestCollector extends DummyTotalHitCountCollector {
 
         private final int id;
@@ -41,6 +51,30 @@ public class ProfileCollectorManagerTests extends ESTestCase {
         }
     }
 
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        directory = newDirectory();
+        try (RandomIndexWriter writer = new RandomIndexWriter(random(), directory, newIndexWriterConfig())) {
+            numDocs = randomIntBetween(900, 1000);
+            for (int i = 0; i < numDocs; i++) {
+                Document doc = new Document();
+                doc.add(new StringField("field1", "value", Field.Store.NO));
+                writer.addDocument(doc);
+            }
+            writer.flush();
+        }
+        reader = DirectoryReader.open(directory);
+        searcher = newSearcher(reader);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        reader.close();
+        directory.close();
+    }
+
     /**
      * This test checks that each new collector is a different instance on each call and that
      * the call to reduce() is forwarded to the wrapped collector manager.
@@ -61,25 +95,28 @@ public class ProfileCollectorManagerTests extends ESTestCase {
                 reduceCalled.set(true);
                 return counter;
             }
-        }, CollectorResult.REASON_SEARCH_TOP_HITS);
+        }, "test_reason");
         int runs = randomIntBetween(5, 10);
         List<InternalProfileCollector> collectors = new ArrayList<>();
         for (int i = 0; i < runs; i++) {
             collectors.add(pcm.newCollector());
             assertEquals(i, ((TestCollector) collectors.get(i).getWrappedCollector()).id);
         }
+
+        long totalTime = 0;
+        LeafReaderContext leafReaderContext = reader.leaves().get(0);
+        for (InternalProfileCollector collector : collectors) {
+            LeafCollector leafCollector = collector.getLeafCollector(leafReaderContext);
+            leafCollector.collect(0);
+            totalTime += collector.getTime();
+        }
         Integer returnValue = pcm.reduce(collectors);
         assertEquals(runs, returnValue.intValue());
         assertTrue(reduceCalled.get());
-    }
-
-    public void testReduceEmpty() {
-        ProfileCollectorManager<TopDocs> pcm = new ProfileCollectorManager<>(
-            TopScoreDocCollector.createSharedManager(10, null, 1000),
-            CollectorResult.REASON_SEARCH_TOP_HITS
-        );
-        AssertionError ae = expectThrows(AssertionError.class, () -> pcm.reduce(Collections.emptyList()));
-        assertEquals("at least one collector expected", ae.getMessage());
+        assertEquals(totalTime, pcm.getCollectorTree().getTime());
+        assertEquals("test_reason", pcm.getCollectorTree().getReason());
+        assertEquals("TestCollector", pcm.getCollectorTree().getName());
+        assertEquals(0, pcm.getCollectorTree().getProfiledChildren().size());
     }
 
     /**
@@ -88,35 +125,127 @@ public class ProfileCollectorManagerTests extends ESTestCase {
      * result from calling the collector tree contains profile results for each slice.
      */
     public void testManagerWithSearcher() throws IOException {
-        Directory directory = newDirectory();
-        try (RandomIndexWriter writer = new RandomIndexWriter(random(), directory, newIndexWriterConfig())) {
-            int numDocs = randomIntBetween(900, 1000);
-            for (int i = 0; i < numDocs; i++) {
-                Document doc = new Document();
-                doc.add(new StringField("field1", "value", Field.Store.NO));
-                writer.addDocument(doc);
-            }
-            writer.flush();
-            IndexReader reader = writer.getReader();
-            IndexSearcher searcher = newSearcher(reader);
-            searcher.setSimilarity(new BM25Similarity());
-
+        {
             CollectorManager<TopScoreDocCollector, TopDocs> topDocsManager = TopScoreDocCollector.createSharedManager(10, null, 1000);
             TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), topDocsManager);
             assertEquals(numDocs, topDocs.totalHits.value);
-
+        }
+        {
+            CollectorManager<TopScoreDocCollector, TopDocs> topDocsManager = TopScoreDocCollector.createSharedManager(10, null, 1000);
             String profileReason = "profiler_reason";
             ProfileCollectorManager<TopDocs> profileCollectorManager = new ProfileCollectorManager<>(topDocsManager, profileReason);
-
-            searcher.search(new MatchAllDocsQuery(), profileCollectorManager);
-
+            TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), profileCollectorManager);
+            assertEquals(numDocs, topDocs.totalHits.value);
             CollectorResult result = profileCollectorManager.getCollectorTree();
             assertEquals("profiler_reason", result.getReason());
             assertEquals("SimpleTopScoreDocCollector", result.getName());
             assertTrue(result.getTime() > 0);
+        }
+    }
 
-            reader.close();
+    public void testManagerWithChildren() throws IOException {
+        {
+            CollectorManager<TopScoreDocCollector, TopDocs> topDocsManager = TopScoreDocCollector.createSharedManager(10, null, 1000);
+            TotalHitCountCollectorManager totalHitCountCollectorManager = new TotalHitCountCollectorManager();
+            MultiCollectorManager multiCollectorManager = new MultiCollectorManager(topDocsManager, totalHitCountCollectorManager);
+            Object[] results = searcher.search(new MatchAllDocsQuery(), multiCollectorManager);
+            assertEquals(numDocs, ((TopDocs) results[0]).totalHits.value);
+            assertEquals(numDocs, (int) results[1]);
+        }
+        {
+            ProfileCollectorManager<TopDocs> topDocsManager = new ProfileCollectorManager<>(
+                TopScoreDocCollector.createSharedManager(10, null, 1000),
+                "top_docs"
+            );
+            ProfileCollectorManager<Integer> aggsManager = new ProfileCollectorManager<>(
+                new TotalHitCountCollectorManager(),
+                "total_hit_count"
+            );
+            MultiCollectorManager multiCollectorManager = new MultiCollectorManager(topDocsManager, aggsManager);
+            ProfileCollectorManager<Object[]> profileCollectorManager = new ProfileCollectorManager<>(
+                multiCollectorManager,
+                "multi_collector",
+                topDocsManager,
+                aggsManager
+            );
+            Object[] results = searcher.search(new MatchAllDocsQuery(), profileCollectorManager);
+            assertEquals(numDocs, ((TopDocs) results[0]).totalHits.value);
+            assertEquals(numDocs, (int) results[1]);
+            CollectorResult result = profileCollectorManager.getCollectorTree();
+            assertEquals("multi_collector", result.getReason());
+            assertEquals("MultiCollector", result.getName());
+            assertTrue(result.getTime() > 0);
+            assertEquals(2, result.getProfiledChildren().size());
+            ProfilerCollectorResult topDocsCollectorResult = result.getProfiledChildren().get(0);
+            assertEquals("top_docs", topDocsCollectorResult.getReason());
+            assertEquals("SimpleTopScoreDocCollector", topDocsCollectorResult.getName());
+            assertTrue(topDocsCollectorResult.getTime() > 0);
+            ProfilerCollectorResult aggsCollectorResult = result.getProfiledChildren().get(1);
+            assertEquals("total_hit_count", aggsCollectorResult.getReason());
+            assertEquals("TotalHitCountCollector", aggsCollectorResult.getName());
+            assertTrue(aggsCollectorResult.getTime() > 0);
+        }
+    }
+
+    public void testManagerWithSingleChild() throws IOException {
+        {
+            CollectorManager<DummyTotalHitCountCollector, Integer> manager = DummyTotalHitCountCollector.createManager();
+            CollectorManagerWrapper collectorManagerWrapper = new CollectorManagerWrapper(manager);
+            Integer hitCount = searcher.search(new MatchAllDocsQuery(), collectorManagerWrapper);
+            assertEquals(numDocs, hitCount.longValue());
+        }
+        {
+            CollectorManager<DummyTotalHitCountCollector, Integer> manager = DummyTotalHitCountCollector.createManager();
+            ProfileCollectorManager<Integer> profileTopDocsManager = new ProfileCollectorManager<>(manager, "dummy_total_hit_count");
+            CollectorManagerWrapper collectorManagerWrapper = new CollectorManagerWrapper(profileTopDocsManager);
+            ProfileCollectorManager<Integer> profileCollectorManager = new ProfileCollectorManager<>(
+                collectorManagerWrapper,
+                "collector_wrapper",
+                profileTopDocsManager,
+                null
+            );
+            Integer hitCount = searcher.search(new MatchAllDocsQuery(), profileCollectorManager);
+            assertEquals(numDocs, hitCount.longValue());
+            CollectorResult result = profileCollectorManager.getCollectorTree();
+            assertEquals("collector_wrapper", result.getReason());
+            assertEquals("CollectorWrapper", result.getName());
+            assertTrue(result.getTime() > 0);
+            assertEquals(1, result.getProfiledChildren().size());
+            ProfilerCollectorResult topDocsCollectorResult = result.getProfiledChildren().get(0);
+            assertEquals("dummy_total_hit_count", topDocsCollectorResult.getReason());
+            assertEquals("DummyTotalHitCountCollector", topDocsCollectorResult.getName());
+            assertTrue(topDocsCollectorResult.getTime() > 0);
+        }
+    }
+
+    private static class CollectorManagerWrapper implements CollectorManager<CollectorWrapper, Integer> {
+
+        private final CollectorManager<?, Integer> collectorManager;
+
+        CollectorManagerWrapper(CollectorManager<?, Integer> collectorManager) {
+            this.collectorManager = collectorManager;
+        }
+
+        @Override
+        public CollectorWrapper newCollector() throws IOException {
+            return new CollectorWrapper(collectorManager.newCollector());
+        }
+
+        @Override
+        public Integer reduce(Collection<CollectorWrapper> collectors) throws IOException {
+            List<Collector> collectorList = collectors.stream().map(collectorWrapper -> collectorWrapper.collector).toList();
+            @SuppressWarnings("unchecked")
+            CollectorManager<Collector, Integer> manager = (CollectorManager<Collector, Integer>) collectorManager;
+            return manager.reduce(collectorList);
+        }
+    }
+
+    private static class CollectorWrapper extends FilterCollector {
+        private final Collector collector;
+
+        CollectorWrapper(Collector collector) {
+            super(collector);
+            this.collector = collector;
         }
-        directory.close();
     }
 }

+ 0 - 1
x-pack/plugin/core/src/main/resources/ecs-dynamic-mappings.json

@@ -124,7 +124,6 @@
               "type": "date"
             },
             "path_match": [
-              "timestamp",
               "*.timestamp",
               "*_timestamp",
               "*.not_after",

+ 4 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-events.json

@@ -24,6 +24,10 @@
       "_source": {
         "enabled": false
       },
+      "_meta": {
+        "index-template-version": ${xpack.profiling.template.version},
+        "index-version": ${xpack.profiling.index.events.version}
+      },
       "dynamic": false,
       "properties": {
         "ecs.version": {

+ 4 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-executables.json

@@ -13,6 +13,10 @@
       "_source": {
         "mode": "synthetic"
       },
+      "_meta": {
+        "index-template-version": ${xpack.profiling.template.version},
+        "index-version": ${xpack.profiling.index.executables.version}
+      },
       "dynamic": false,
       "properties": {
         "ecs.version": {

+ 4 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-hosts.json

@@ -19,6 +19,10 @@
       "_source": {
         "enabled": true
       },
+      "_meta": {
+        "index-template-version": ${xpack.profiling.template.version},
+        "index-version": ${xpack.profiling.index.hosts.version}
+      },
       "dynamic": false,
       "properties": {
         "ecs.version": {

+ 4 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-metrics.json

@@ -19,6 +19,10 @@
       "_source": {
         "enabled": false
       },
+      "_meta": {
+        "index-template-version": ${xpack.profiling.template.version},
+        "index-version": ${xpack.profiling.index.metrics.version}
+      },
       /*
        We intentionally allow dynamic mappings for metrics. Which metrics are added is guarded by
        the collector and we want to allow adding new metrics over time. As this is a datastream,

+ 4 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-stackframes.json

@@ -21,6 +21,10 @@
       "_source": {
         "enabled": true
       },
+      "_meta": {
+        "index-template-version": ${xpack.profiling.template.version},
+        "index-version": ${xpack.profiling.index.stackframes.version}
+      },
       "dynamic": false,
       "properties": {
         "ecs.version": {

+ 4 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-stacktraces.json

@@ -19,6 +19,10 @@
       "_source": {
         "mode": "synthetic"
       },
+      "_meta": {
+        "index-template-version": ${xpack.profiling.template.version},
+        "index-version": ${xpack.profiling.index.stacktraces.version}
+      },
       "dynamic": false,
       "properties": {
         "ecs.version": {

+ 4 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/component-template/profiling-symbols.json

@@ -17,6 +17,10 @@
       "_source": {
         "enabled": true
       },
+      "_meta": {
+        "index-template-version": ${xpack.profiling.template.version},
+        "index-version": ${xpack.profiling.index.symbols.version}
+      },
       "dynamic": false,
       "properties": {
         "ecs.version": {

+ 18 - 1
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/index-template/profiling-returnpads-private.json

@@ -19,6 +19,10 @@
       "_source": {
         "enabled": true
       },
+      "_meta": {
+        "index-template-version": ${xpack.profiling.template.version},
+        "index-version": ${xpack.profiling.index.returnpads.private.version}
+      },
       "dynamic": false,
       "properties": {
         "ecs.version": {
@@ -34,7 +38,20 @@
           "store": false
         },
         "Symbfile.file.id": {
-          /* 'binary' type fields don't allow using 'index: true'. */
+          /*
+           'binary' type fields don't allow using 'index: true'.
+           The value is stored in its binary form (byte array).
+           Not readable or writeable via JSON (CBOR and SMILE only).
+           */
+          "type": "keyword",
+          "index": true,
+          "doc_values": false,
+          "store": false
+        },
+        "Symbfile.file.id_str": {
+          /*
+           The value is stored as base64 encoded string to allow JSON queries.
+           */
           "type": "keyword",
           "index": true,
           "doc_values": false,

+ 4 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/index-template/profiling-sq-executables.json

@@ -15,6 +15,10 @@
       "_source": {
         "mode": "synthetic"
       },
+      "_meta": {
+        "index-template-version": ${xpack.profiling.template.version},
+        "index-version": ${xpack.profiling.index.sq.executables.version}
+      },
       "dynamic": false,
       "properties": {
         "ecs.version": {

+ 4 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/profiler/index-template/profiling-sq-leafframes.json

@@ -15,6 +15,10 @@
       "_source": {
         "mode": "synthetic"
       },
+      "_meta": {
+        "index-template-version": ${xpack.profiling.template.version},
+        "index-version": ${xpack.profiling.index.sq.leafframes.version}
+      },
       "dynamic": false,
       "properties": {
         "ecs.version": {

+ 218 - 0
x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/AbstractProfilingPersistenceManager.java

@@ -0,0 +1,218 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.profiler;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.support.RefCountingRunnable;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.cluster.health.ClusterIndexHealth;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.gateway.GatewayService;
+import org.elasticsearch.index.Index;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class AbstractProfilingPersistenceManager<T extends AbstractProfilingPersistenceManager.ProfilingIndexAbstraction>
+    implements
+        ClusterStateListener,
+        Closeable {
+    protected final Logger logger = LogManager.getLogger(getClass());
+
+    private final AtomicBoolean inProgress = new AtomicBoolean(false);
+
+    private final ClusterService clusterService;
+    private volatile boolean templatesEnabled;
+
+    public AbstractProfilingPersistenceManager(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    public void initialize() {
+        clusterService.addListener(this);
+    }
+
+    @Override
+    public void close() {
+        clusterService.removeListener(this);
+    }
+
+    public void setTemplatesEnabled(boolean templatesEnabled) {
+        this.templatesEnabled = templatesEnabled;
+    }
+
+    @Override
+    public final void clusterChanged(ClusterChangedEvent event) {
+        if (templatesEnabled == false) {
+            return;
+        }
+        // wait for the cluster state to be recovered
+        if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
+            return;
+        }
+
+        // If this node is not a master node, exit.
+        if (event.state().nodes().isLocalNodeElectedMaster() == false) {
+            return;
+        }
+
+        if (event.state().nodes().getMaxNodeVersion().after(event.state().nodes().getSmallestNonClientNodeVersion())) {
+            logger.debug("Skipping up-to-date check as cluster has mixed versions");
+            return;
+        }
+
+        if (isAllResourcesCreated(event) == false) {
+            logger.trace("Skipping index creation; not all required resources are present yet");
+            return;
+        }
+
+        if (inProgress.compareAndSet(false, true) == false) {
+            logger.trace("Skipping index creation as changes are already in progress");
+            return;
+        }
+
+        // Only release the lock once all upgrade attempts have succeeded or failed.
+        try (var refs = new RefCountingRunnable(() -> inProgress.set(false))) {
+            ClusterState clusterState = event.state();
+            for (T index : getManagedIndices()) {
+                Status status = getStatus(clusterState, index);
+                if (status.actionable) {
+                    onStatus(clusterState, status, index, ActionListener.releasing(refs.acquire()));
+                }
+            }
+        }
+    }
+
+    protected boolean isAllResourcesCreated(ClusterChangedEvent event) {
+        return ProfilingIndexTemplateRegistry.isAllResourcesCreated(event.state());
+    }
+
+    /**
+     * Extracts the appropriate index metadata for a given index from the cluster state.
+     *
+     * @param state Current cluster state. Never <code>null</code>.
+     * @param index An index for which to retrieve index metadata. Never <code>null</code>.
+     * @return The corresponding index metadata or <code>null</code> if there are none.
+     */
+    protected abstract IndexMetadata indexMetadata(ClusterState state, T index);
+
+    /**
+     * @return An iterable of all indices that are managed by this instance.
+     */
+    protected abstract Iterable<T> getManagedIndices();
+
+    /**
+     * Handler that takes appropriate action for a certain index status.
+     *
+     * @param clusterState The current cluster state. Never <code>null</code>.
+     * @param status Status of the current index.
+     * @param index The current index.
+     * @param listener Listener to be called on completion / errors.
+     */
+    protected abstract void onStatus(ClusterState clusterState, Status status, T index, ActionListener<? super ActionResponse> listener);
+
+    private Status getStatus(ClusterState state, T index) {
+        IndexMetadata metadata = indexMetadata(state, index);
+        if (metadata == null) {
+            return Status.NEEDS_CREATION;
+        }
+        if (metadata.getState() == IndexMetadata.State.CLOSE) {
+            logger.warn(
+                "Index [{}] is closed. This is likely to prevent Universal Profiling from functioning correctly",
+                metadata.getIndex()
+            );
+            return Status.CLOSED;
+        }
+        final IndexRoutingTable routingTable = state.getRoutingTable().index(metadata.getIndex());
+        ClusterHealthStatus indexHealth = new ClusterIndexHealth(metadata, routingTable).getStatus();
+        if (indexHealth == ClusterHealthStatus.RED) {
+            logger.debug("Index [{}] health status is RED, any pending mapping upgrades will wait until this changes", metadata.getIndex());
+            return Status.UNHEALTHY;
+        }
+        MappingMetadata mapping = metadata.mapping();
+        if (mapping != null) {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> meta = (Map<String, Object>) mapping.sourceAsMap().get("_meta");
+            int currentIndexVersion;
+            int currentTemplateVersion;
+            if (meta == null) {
+                logger.debug("Missing _meta field in mapping of index [{}], assuming initial version.", metadata.getIndex());
+                currentIndexVersion = 1;
+                currentTemplateVersion = 1;
+            } else {
+                // we are extra defensive and treat any unexpected values as an unhealthy index which we won't touch.
+                currentIndexVersion = getVersionField(metadata.getIndex(), meta, "index-version");
+                currentTemplateVersion = getVersionField(metadata.getIndex(), meta, "index-template-version");
+                if (currentIndexVersion == -1 || currentTemplateVersion == -1) {
+                    return Status.UNHEALTHY;
+                }
+            }
+            if (index.getVersion() > currentIndexVersion) {
+                return Status.NEEDS_VERSION_BUMP;
+            } else if (ProfilingIndexTemplateRegistry.INDEX_TEMPLATE_VERSION > currentTemplateVersion) {
+                // TODO 8.10+: Check if there are any pending migrations. If none are pending we can consider the index up to date.
+                return Status.NEEDS_MAPPINGS_UPDATE;
+            } else {
+                return Status.UP_TO_DATE;
+            }
+        } else {
+            logger.warn("No mapping found for existing index [{}]. Index cannot be migrated.", metadata.getIndex());
+            return Status.UNHEALTHY;
+        }
+    }
+
+    private int getVersionField(Index index, Map<String, Object> meta, String fieldName) {
+        Object value = meta.get(fieldName);
+        if (value instanceof Integer) {
+            return (int) value;
+        }
+        if (value == null) {
+            logger.warn("Metadata version field [{}] of index [{}] is empty.", fieldName, index);
+            return -1;
+        }
+        logger.warn("Metadata version field [{}] of index [{}] is [{}] (expected an integer).", fieldName, index, value);
+        return -1;
+    }
+
+    enum Status {
+        CLOSED(false),
+        UNHEALTHY(false),
+        NEEDS_CREATION(true),
+        NEEDS_VERSION_BUMP(true),
+        UP_TO_DATE(false),
+        NEEDS_MAPPINGS_UPDATE(true);
+
+        /**
+         * Whether a status is for informational purposes only or whether it should be acted upon and may change cluster state.
+         */
+        private final boolean actionable;
+
+        Status(boolean actionable) {
+            this.actionable = actionable;
+        }
+    }
+
+    /**
+     * An index that is used by Universal Profiling.
+     */
+    interface ProfilingIndexAbstraction {
+        String getName();
+
+        int getVersion();
+    }
+}

+ 241 - 0
x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingDataStreamManager.java

@@ -0,0 +1,241 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.profiler;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
+import org.elasticsearch.action.datastreams.CreateDataStreamAction;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.ClientHelper;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+import static org.elasticsearch.core.Strings.format;
+import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
+
+/**
+ * Creates all data streams that are required for using Elastic Universal Profiling.
+ */
+public class ProfilingDataStreamManager extends AbstractProfilingPersistenceManager<ProfilingDataStreamManager.ProfilingDataStream> {
+    public static final List<ProfilingDataStream> PROFILING_DATASTREAMS;
+
+    static {
+        List<ProfilingDataStream> dataStreams = new ArrayList<>(
+            EventsIndex.indexNames()
+                .stream()
+                .map(n -> ProfilingDataStream.of(n, ProfilingIndexTemplateRegistry.PROFILING_EVENTS_VERSION))
+                .toList()
+        );
+        dataStreams.add(ProfilingDataStream.of("profiling-metrics", ProfilingIndexTemplateRegistry.PROFILING_METRICS_VERSION));
+        dataStreams.add(ProfilingDataStream.of("profiling-hosts", ProfilingIndexTemplateRegistry.PROFILING_HOSTS_VERSION));
+        PROFILING_DATASTREAMS = Collections.unmodifiableList(dataStreams);
+    }
+
+    private final ThreadPool threadPool;
+    private final Client client;
+
+    public ProfilingDataStreamManager(ThreadPool threadPool, Client client, ClusterService clusterService) {
+        super(clusterService);
+        this.threadPool = threadPool;
+        this.client = client;
+    }
+
+    @Override
+    protected void onStatus(
+        ClusterState clusterState,
+        Status status,
+        ProfilingDataStream index,
+        ActionListener<? super ActionResponse> listener
+    ) {
+        switch (status) {
+            case NEEDS_CREATION -> createDataStream(index, listener);
+            case NEEDS_VERSION_BUMP -> rolloverDataStream(index, listener);
+            default -> {
+                logger.debug("Skipping status change [{}] for data stream [{}].", status, index);
+                // ensure that listener is notified we're done
+                listener.onResponse(null);
+            }
+        }
+    }
+
+    protected IndexMetadata indexMetadata(ClusterState state, ProfilingDataStream dataStream) {
+        Map<String, DataStream> dataStreams = state.metadata().dataStreams();
+        if (dataStreams == null) {
+            return null;
+        }
+        DataStream ds = dataStreams.get(dataStream.getName());
+        if (ds == null) {
+            return null;
+        }
+        Index writeIndex = ds.getWriteIndex();
+        if (writeIndex == null) {
+            return null;
+        }
+        return state.metadata().index(writeIndex);
+    }
+
+    @Override
+    protected Iterable<ProfilingDataStream> getManagedIndices() {
+        return PROFILING_DATASTREAMS;
+    }
+
+    private void onDataStreamFailure(ProfilingDataStream dataStream, Exception ex) {
+        logger.error(() -> format("error for data stream [%s] for [%s]", dataStream, ClientHelper.PROFILING_ORIGIN), ex);
+    }
+
+    private void rolloverDataStream(final ProfilingDataStream dataStream, ActionListener<? super ActionResponse> listener) {
+        logger.debug("rolling over data stream [{}].", dataStream);
+        final Executor executor = threadPool.generic();
+        executor.execute(() -> {
+            RolloverRequest request = new RolloverRequest(dataStream.getName(), null);
+            request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
+            executeAsyncWithOrigin(
+                client.threadPool().getThreadContext(),
+                ClientHelper.PROFILING_ORIGIN,
+                request,
+                new ActionListener<RolloverResponse>() {
+                    @Override
+                    public void onResponse(RolloverResponse response) {
+                        if (response.isAcknowledged() == false) {
+                            logger.error(
+                                "error rolling over data stream [{}] for [{}], request was not acknowledged",
+                                dataStream,
+                                ClientHelper.PROFILING_ORIGIN
+                            );
+                        } else if (response.isShardsAcknowledged() == false) {
+                            logger.warn(
+                                "rolling over data stream [{}] for [{}], shards were not acknowledged",
+                                dataStream,
+                                ClientHelper.PROFILING_ORIGIN
+                            );
+                        } else if (response.isRolledOver() == false) {
+                            logger.warn("could not rollover data stream [{}] for [{}].", dataStream, ClientHelper.PROFILING_ORIGIN);
+                        } else {
+                            logger.debug(
+                                "rolled over data stream [{}] from [{}] to index [{}] for [{}].",
+                                dataStream,
+                                response.getOldIndex(),
+                                response.getNewIndex(),
+                                ClientHelper.PROFILING_ORIGIN
+                            );
+                        }
+                        listener.onResponse(response);
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        onDataStreamFailure(dataStream, e);
+                        listener.onFailure(e);
+                    }
+                },
+                (req, l) -> client.admin().indices().rolloverIndex(req, l)
+            );
+        });
+    }
+
+    private void createDataStream(ProfilingDataStream dataStream, final ActionListener<? super ActionResponse> listener) {
+        final Executor executor = threadPool.generic();
+        executor.execute(() -> {
+            CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(dataStream.getName());
+            request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
+            executeAsyncWithOrigin(
+                client.threadPool().getThreadContext(),
+                ClientHelper.PROFILING_ORIGIN,
+                request,
+                new ActionListener<AcknowledgedResponse>() {
+                    @Override
+                    public void onResponse(AcknowledgedResponse response) {
+                        if (response.isAcknowledged() == false) {
+                            logger.error(
+                                "error adding data stream [{}] for [{}], request was not acknowledged",
+                                dataStream,
+                                ClientHelper.PROFILING_ORIGIN
+                            );
+                        }
+                        listener.onResponse(response);
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        onDataStreamFailure(dataStream, e);
+                        listener.onFailure(e);
+                    }
+                },
+                (req, l) -> client.execute(CreateDataStreamAction.INSTANCE, req, l)
+            );
+        });
+    }
+
+    /**
+     * A datastream that is used by Universal Profiling.
+     */
+    static class ProfilingDataStream implements AbstractProfilingPersistenceManager.ProfilingIndexAbstraction {
+        private final String name;
+        private final int version;
+
+        public static ProfilingDataStream of(String name, int version) {
+            return new ProfilingDataStream(name, version);
+        }
+
+        private ProfilingDataStream(String name, int version) {
+            this.name = name;
+            this.version = version;
+        }
+
+        public ProfilingDataStream withVersion(int version) {
+            return new ProfilingDataStream(name, version);
+        }
+
+        @Override
+        public String getName() {
+            return name;
+        }
+
+        @Override
+        public int getVersion() {
+            return version;
+        }
+
+        @Override
+        public String toString() {
+            return getName();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ProfilingDataStream that = (ProfilingDataStream) o;
+            return version == that.version && Objects.equals(name, that.name);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(name, version);
+        }
+    }
+}

+ 283 - 99
x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingIndexManager.java

@@ -7,31 +7,30 @@
 
 package org.elasticsearch.xpack.profiler;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.internal.Client;
-import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
 
-import java.io.Closeable;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Objects;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
 
 import static org.elasticsearch.core.Strings.format;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@@ -39,107 +38,202 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 /**
  * Creates all indices that are required for using Elastic Universal Profiling.
  */
-public class ProfilingIndexManager implements ClusterStateListener, Closeable {
-    private static final Logger logger = LogManager.getLogger(ProfilingIndexManager.class);
+public class ProfilingIndexManager extends AbstractProfilingPersistenceManager<ProfilingIndexManager.ProfilingIndex> {
     // For testing
     public static final List<ProfilingIndex> PROFILING_INDICES = List.of(
-        ProfilingIndex.regular("profiling-returnpads-private"),
-        ProfilingIndex.regular("profiling-sq-executables"),
-        ProfilingIndex.regular("profiling-sq-leafframes"),
-        ProfilingIndex.regular("profiling-symbols-private"),
-        ProfilingIndex.kv("profiling-executables"),
-        ProfilingIndex.kv("profiling-stackframes"),
-        ProfilingIndex.kv("profiling-stacktraces"),
-        ProfilingIndex.kv("profiling-symbols-global")
+        ProfilingIndex.regular(
+            "profiling-returnpads-private",
+            ProfilingIndexTemplateRegistry.PROFILING_RETURNPADS_PRIVATE_VERSION,
+            OnVersionBump.KEEP_OLD
+        ),
+        ProfilingIndex.regular(
+            "profiling-sq-executables",
+            ProfilingIndexTemplateRegistry.PROFILING_SQ_EXECUTABLES_VERSION,
+            OnVersionBump.DELETE_OLD
+        ),
+        ProfilingIndex.regular(
+            "profiling-sq-leafframes",
+            ProfilingIndexTemplateRegistry.PROFILING_SQ_LEAFFRAMES_VERSION,
+            OnVersionBump.DELETE_OLD
+        ),
+        ProfilingIndex.regular(
+            "profiling-symbols-private",
+            ProfilingIndexTemplateRegistry.PROFILING_SYMBOLS_VERSION,
+            OnVersionBump.KEEP_OLD
+        ),
+        ProfilingIndex.kv("profiling-executables", ProfilingIndexTemplateRegistry.PROFILING_EXECUTABLES_VERSION),
+        ProfilingIndex.kv("profiling-stackframes", ProfilingIndexTemplateRegistry.PROFILING_STACKFRAMES_VERSION),
+        ProfilingIndex.kv("profiling-stacktraces", ProfilingIndexTemplateRegistry.PROFILING_STACKTRACES_VERSION),
+        ProfilingIndex.kv("profiling-symbols-global", ProfilingIndexTemplateRegistry.PROFILING_SYMBOLS_VERSION)
     );
 
     private final ThreadPool threadPool;
     private final Client client;
-    private final ClusterService clusterService;
-    private final ConcurrentMap<String, AtomicBoolean> creationInProgressPerIndex = new ConcurrentHashMap<>();
-    private volatile boolean templatesEnabled;
 
     public ProfilingIndexManager(ThreadPool threadPool, Client client, ClusterService clusterService) {
+        super(clusterService);
         this.threadPool = threadPool;
         this.client = client;
-        this.clusterService = clusterService;
-    }
-
-    public void initialize() {
-        clusterService.addListener(this);
     }
 
     @Override
-    public void close() {
-        clusterService.removeListener(this);
-    }
-
-    public void setTemplatesEnabled(boolean templatesEnabled) {
-        this.templatesEnabled = templatesEnabled;
+    protected void onStatus(
+        ClusterState clusterState,
+        Status status,
+        ProfilingIndex index,
+        ActionListener<? super ActionResponse> listener
+    ) {
+        switch (status) {
+            case NEEDS_CREATION -> createIndex(clusterState, index, listener);
+            case NEEDS_VERSION_BUMP -> bumpVersion(clusterState, index, listener);
+            default -> {
+                logger.debug("Skipping status change [{}] for index [{}].", status, index);
+                // ensure that listener is notified we're done
+                listener.onResponse(null);
+            }
+        }
     }
 
     @Override
-    public void clusterChanged(ClusterChangedEvent event) {
-        if (templatesEnabled == false) {
-            return;
+    protected IndexMetadata indexMetadata(ClusterState state, ProfilingIndex index) {
+        Map<String, IndexMetadata> indicesMetadata = state.metadata().indices();
+        if (indicesMetadata == null) {
+            return null;
         }
-        // wait for the cluster state to be recovered
-        if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
-            return;
+        IndexMetadata metadata = indicesMetadata.get(index.toString());
+        // prioritize the most recent generation from the current version
+        if (metadata == null && index.isKvIndex()) {
+            metadata = indicesMetadata.entrySet()
+                .stream()
+                .filter(e -> index.isMatchWithoutGeneration(e.getKey()))
+                // use the most recent index to make sure we use the most recent version info from the _meta field
+                .max(Comparator.comparingLong(e -> e.getValue().getCreationDate()))
+                .map(Map.Entry::getValue)
+                .orElse(null);
         }
 
-        // If this node is not a master node, exit.
-        if (event.state().nodes().isLocalNodeElectedMaster() == false) {
-            return;
+        // attempt to find an index from an earlier generation
+        if (metadata == null) {
+            metadata = indicesMetadata.entrySet()
+                .stream()
+                .filter(e -> index.isMatchWithoutVersion(e.getKey()))
+                // use the most recent index to make sure we use the most recent version info from the _meta field
+                .max(Comparator.comparingLong(e -> e.getValue().getCreationDate()))
+                .map(Map.Entry::getValue)
+                .orElse(null);
         }
 
-        if (event.state().nodes().getMaxNodeVersion().after(event.state().nodes().getSmallestNonClientNodeVersion())) {
-            logger.debug("Skipping up-to-date check as cluster has mixed versions");
-            return;
-        }
+        return metadata;
+    }
 
-        // ensure that all resources are present that we need to create indices
-        if (isAllResourcesCreated(event) == false) {
-            logger.trace("Skipping index creation; not all required resources are present yet");
-            return;
+    private void bumpVersion(ClusterState state, ProfilingIndex index, ActionListener<? super ActionResponse> listener) {
+        if (index.getOnVersionBump() == OnVersionBump.DELETE_OLD) {
+            Map<String, IndexMetadata> indicesMetadata = state.metadata().indices();
+            List<String> priorIndexVersions = indicesMetadata.keySet()
+                .stream()
+                // ignore the current index and look only for old versions
+                .filter(Predicate.not(index::isFullMatch))
+                .filter(index::isMatchWithoutVersion)
+                .toList();
+            if (priorIndexVersions.isEmpty() == false) {
+                logger.debug("deleting indices [{}] on index version bump for [{}].", priorIndexVersions, index.getAlias());
+                deleteIndices(
+                    priorIndexVersions.toArray(new String[0]),
+                    // the cluster state that we are operating on is a snapshot and won't reflect that the alias has just gone.
+                    // Therefore, we use putIndex here which does not check for the existence of an alias
+                    ActionListener.wrap(r -> putIndex(index.getName(), index.getAlias(), listener), listener::onFailure)
+                );
+            } else {
+                createIndex(state, index, listener);
+            }
+        } else {
+            createIndex(state, index, listener);
         }
+    }
 
-        addIndicesIfMissing(event.state());
+    @Override
+    protected Iterable<ProfilingIndex> getManagedIndices() {
+        return PROFILING_INDICES;
     }
 
-    protected boolean isAllResourcesCreated(ClusterChangedEvent event) {
-        return ProfilingIndexTemplateRegistry.isAllResourcesCreated(event.state());
+    private void onCreateIndexFailure(String index, Exception ex) {
+        logger.error(() -> format("error adding index [%s] for [%s]", index, ClientHelper.PROFILING_ORIGIN), ex);
     }
 
-    private void addIndicesIfMissing(ClusterState state) {
-        Map<String, IndexMetadata> indicesMetadata = state.metadata().indices();
-        for (ProfilingIndex profilingIndex : PROFILING_INDICES) {
-            String index = profilingIndex.toString();
-            final AtomicBoolean creationInProgress = creationInProgressPerIndex.computeIfAbsent(index, key -> new AtomicBoolean(false));
-            if (creationInProgress.compareAndSet(false, true)) {
-                // Do a quick (exact) check first
-                boolean indexNeedsToBeCreated = indicesMetadata == null || indicesMetadata.get(index) == null;
-                // for K/V indices we must not create the index if a newer generation exists
-                if (indexNeedsToBeCreated && profilingIndex.isKvIndex()) {
-                    indexNeedsToBeCreated = indicesMetadata != null
-                        && indicesMetadata.keySet().stream().anyMatch(profilingIndex::isMatchWithoutGeneration) == false;
-                }
-                if (indexNeedsToBeCreated) {
-                    logger.debug("adding index [{}], because it doesn't exist", index);
-                    putIndex(index, profilingIndex.getAlias(), creationInProgress);
-                } else {
-                    logger.trace("not adding index [{}], because it already exists", index);
-                    creationInProgress.set(false);
-                }
-            }
+    private void createIndex(final ClusterState state, final ProfilingIndex index, final ActionListener<? super ActionResponse> listener) {
+        if (state.metadata().hasAlias(index.getAlias())) {
+            // there is an existing index from a prior version. Use the rollover API to move the write alias atomically. This has the
+            // following implications:
+            //
+            // * A new index will be created according to the currently installed version of the matching index template.
+            // * The write alias will point to that index.
+            // * The prior index will continue to be managed by ILM but will advance to the next phase after rollover. As
+            // rollover blocks phase transitions, the prior index may move a bit sooner than expected to the warm tier
+            // after version bumps; still all conditions need to be met, it's just that due to the earlier rollover, the
+            // condition will be reached sooner than without a version bump.
+            rolloverIndex(index.getName(), index.getAlias(), listener);
+        } else {
+            // newly create index
+            putIndex(index.getName(), index.getAlias(), listener);
         }
     }
 
-    private void onPutIndexFailure(String index, Exception ex) {
-        logger.error(() -> format("error adding index [%s] for [%s]", index, ClientHelper.PROFILING_ORIGIN), ex);
+    private void rolloverIndex(final String newIndex, final String alias, ActionListener<? super ActionResponse> listener) {
+        logger.debug("rolling over to index [{}] for alias [{}].", newIndex, alias);
+        final Executor executor = threadPool.generic();
+        executor.execute(() -> {
+            RolloverRequest request = new RolloverRequest(alias, newIndex);
+            request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
+            executeAsyncWithOrigin(
+                client.threadPool().getThreadContext(),
+                ClientHelper.PROFILING_ORIGIN,
+                request,
+                new ActionListener<RolloverResponse>() {
+                    @Override
+                    public void onResponse(RolloverResponse response) {
+                        if (response.isAcknowledged() == false) {
+                            logger.error(
+                                "error rolling over index [{}] for [{}], request was not acknowledged",
+                                newIndex,
+                                ClientHelper.PROFILING_ORIGIN
+                            );
+                        } else if (response.isShardsAcknowledged() == false) {
+                            logger.warn(
+                                "rolling over index [{}] for [{}], shards were not acknowledged",
+                                newIndex,
+                                ClientHelper.PROFILING_ORIGIN
+                            );
+                        } else if (response.isRolledOver() == false) {
+                            logger.warn(
+                                "could not rollover alias [{}] to index [{}] for [{}].",
+                                alias,
+                                newIndex,
+                                ClientHelper.PROFILING_ORIGIN
+                            );
+                        } else {
+                            logger.debug(
+                                "rolled over alias [{}] from [{}] to index [{}] for [{}].",
+                                alias,
+                                response.getOldIndex(),
+                                response.getNewIndex(),
+                                ClientHelper.PROFILING_ORIGIN
+                            );
+                        }
+                        listener.onResponse(response);
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        onCreateIndexFailure(newIndex, e);
+                        listener.onFailure(e);
+                    }
+                },
+                (req, l) -> client.admin().indices().rolloverIndex(req, l)
+            );
+        });
     }
 
-    private void putIndex(final String index, final String alias, final AtomicBoolean creationCheck) {
+    private void putIndex(final String index, final String alias, final ActionListener<? super ActionResponse> listener) {
         final Executor executor = threadPool.generic();
         executor.execute(() -> {
             CreateIndexRequest request = new CreateIndexRequest(index);
@@ -148,8 +242,8 @@ public class ProfilingIndexManager implements ClusterStateListener, Closeable {
                     Map<String, Object> sourceAsMap = Map.of("aliases", Map.of(alias, Map.of("is_write_index", true)));
                     request.source(sourceAsMap, LoggingDeprecationHandler.INSTANCE);
                 } catch (Exception ex) {
-                    creationCheck.set(false);
-                    onPutIndexFailure(index, ex);
+                    onCreateIndexFailure(index, ex);
+                    listener.onFailure(ex);
                     return;
                 }
             }
@@ -161,7 +255,6 @@ public class ProfilingIndexManager implements ClusterStateListener, Closeable {
                 new ActionListener<CreateIndexResponse>() {
                     @Override
                     public void onResponse(CreateIndexResponse response) {
-                        creationCheck.set(false);
                         if (response.isAcknowledged() == false) {
                             logger.error(
                                 "error adding index [{}] for [{}], request was not acknowledged",
@@ -171,64 +264,155 @@ public class ProfilingIndexManager implements ClusterStateListener, Closeable {
                         } else if (response.isShardsAcknowledged() == false) {
                             logger.warn("adding index [{}] for [{}], shards were not acknowledged", index, ClientHelper.PROFILING_ORIGIN);
                         }
+                        listener.onResponse(response);
                     }
 
                     @Override
                     public void onFailure(Exception e) {
-                        creationCheck.set(false);
-                        onPutIndexFailure(index, e);
+                        onCreateIndexFailure(index, e);
+                        listener.onFailure(e);
                     }
                 },
-                (req, listener) -> client.admin().indices().create(req, listener)
+                (req, l) -> client.admin().indices().create(req, l)
             );
         });
     }
 
+    private void onDeleteIndexFailure(String[] indices, Exception ex) {
+        logger.error(() -> format("error deleting indices [%s] for [%s]", indices, ClientHelper.PROFILING_ORIGIN), ex);
+    }
+
+    private void deleteIndices(final String[] indices, final ActionListener<AcknowledgedResponse> listener) {
+        final Executor executor = threadPool.generic();
+        executor.execute(() -> {
+            DeleteIndexRequest request = new DeleteIndexRequest(indices);
+            request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
+            executeAsyncWithOrigin(
+                client.threadPool().getThreadContext(),
+                ClientHelper.PROFILING_ORIGIN,
+                request,
+                new ActionListener<AcknowledgedResponse>() {
+                    @Override
+                    public void onResponse(AcknowledgedResponse response) {
+                        if (response.isAcknowledged() == false) {
+                            logger.error(
+                                "error deleting indices [{}] for [{}], request was not acknowledged",
+                                indices,
+                                ClientHelper.PROFILING_ORIGIN
+                            );
+                        }
+                        listener.onResponse(response);
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        onDeleteIndexFailure(indices, e);
+                        listener.onFailure(e);
+                    }
+                },
+                (req, l) -> client.admin().indices().delete(req, l)
+            );
+        });
+    }
+
+    enum OnVersionBump {
+        DELETE_OLD,
+        KEEP_OLD
+    }
+
     /**
      * An index that is used by Universal Profiling.
      */
-    static class ProfilingIndex {
-        private final String name;
+    static class ProfilingIndex implements ProfilingIndexAbstraction {
+        private final String namePrefix;
+        private final int version;
         private final String generation;
+        private final OnVersionBump onVersionBump;
 
-        public static ProfilingIndex regular(String name) {
-            return new ProfilingIndex(name, null);
+        public static ProfilingIndex regular(String name, int version, OnVersionBump onVersionBump) {
+            return new ProfilingIndex(name, version, null, onVersionBump);
         }
 
-        public static ProfilingIndex kv(String name) {
-            return new ProfilingIndex(name, "000001");
+        public static ProfilingIndex kv(String name, int version) {
+            // K/V indices will age automatically as per the ILM policy, and we won't force-upgrade them on version bumps
+            return new ProfilingIndex(name, version, "000001", OnVersionBump.KEEP_OLD);
         }
 
-        private ProfilingIndex(String namePrefix, String generation) {
-            this.name = namePrefix;
+        private ProfilingIndex(String namePrefix, int version, String generation, OnVersionBump onVersionBump) {
+            this.namePrefix = namePrefix;
+            this.version = version;
             this.generation = generation;
+            this.onVersionBump = onVersionBump;
+        }
+
+        public ProfilingIndex withVersion(int version) {
+            return new ProfilingIndex(namePrefix, version, generation, onVersionBump);
+        }
+
+        public ProfilingIndex withGeneration(String generation) {
+            return new ProfilingIndex(namePrefix, version, generation, onVersionBump);
+        }
+
+        public boolean isMatchWithoutVersion(String indexName) {
+            return indexName.startsWith("." + namePrefix);
         }
 
         public boolean isMatchWithoutGeneration(String indexName) {
             return indexName.startsWith(indexPrefix());
         }
 
+        public boolean isFullMatch(String indexName) {
+            return toString().equals(indexName);
+        }
+
         public boolean isKvIndex() {
             return generation != null;
         }
 
         public String getAlias() {
-            return name;
+            return namePrefix;
+        }
+
+        @Override
+        public String getName() {
+            return isKvIndex() ? String.format(Locale.ROOT, "%s-%s", indexPrefix(), generation) : indexPrefix();
+        }
+
+        public int getVersion() {
+            return version;
+        }
+
+        public OnVersionBump getOnVersionBump() {
+            return onVersionBump;
         }
 
         private String indexPrefix() {
-            return String.format(Locale.ROOT, ".%s-v%03d", name, ProfilingIndexTemplateRegistry.INDEX_TEMPLATE_VERSION);
+            return String.format(Locale.ROOT, ".%s-v%03d", namePrefix, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder sb = new StringBuilder();
-            sb.append(indexPrefix());
-            if (generation != null) {
-                sb.append("-");
-                sb.append(generation);
+            return getName();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
             }
-            return sb.toString();
+            ProfilingIndex index = (ProfilingIndex) o;
+            return version == index.version
+                && Objects.equals(namePrefix, index.namePrefix)
+                && Objects.equals(generation, index.generation)
+                && onVersionBump == index.onVersionBump;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(namePrefix, version, generation, onVersionBump);
         }
     }
 }

+ 36 - 10
x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingIndexTemplateRegistry.java

@@ -42,6 +42,18 @@ public class ProfilingIndexTemplateRegistry extends IndexTemplateRegistry {
     // version 1: initial
     public static final int INDEX_TEMPLATE_VERSION = 1;
 
+    // history for individual indices / index templates. Only bump these for breaking changes that require to create a new index
+    public static final int PROFILING_EVENTS_VERSION = 1;
+    public static final int PROFILING_EXECUTABLES_VERSION = 1;
+    public static final int PROFILING_METRICS_VERSION = 1;
+    public static final int PROFILING_HOSTS_VERSION = 1;
+    public static final int PROFILING_STACKFRAMES_VERSION = 1;
+    public static final int PROFILING_STACKTRACES_VERSION = 1;
+    public static final int PROFILING_SYMBOLS_VERSION = 1;
+    public static final int PROFILING_RETURNPADS_PRIVATE_VERSION = 1;
+    public static final int PROFILING_SQ_EXECUTABLES_VERSION = 1;
+    public static final int PROFILING_SQ_LEAFFRAMES_VERSION = 1;
+
     public static final String PROFILING_TEMPLATE_VERSION_VARIABLE = "xpack.profiling.template.version";
 
     private volatile boolean templatesEnabled;
@@ -96,13 +108,15 @@ public class ProfilingIndexTemplateRegistry extends IndexTemplateRegistry {
                 "profiling-events",
                 "/org/elasticsearch/xpack/profiler/component-template/profiling-events.json",
                 INDEX_TEMPLATE_VERSION,
-                PROFILING_TEMPLATE_VERSION_VARIABLE
+                PROFILING_TEMPLATE_VERSION_VARIABLE,
+                indexVersion("events", PROFILING_EVENTS_VERSION)
             ),
             new IndexTemplateConfig(
                 "profiling-executables",
                 "/org/elasticsearch/xpack/profiler/component-template/profiling-executables.json",
                 INDEX_TEMPLATE_VERSION,
-                PROFILING_TEMPLATE_VERSION_VARIABLE
+                PROFILING_TEMPLATE_VERSION_VARIABLE,
+                indexVersion("executables", PROFILING_EXECUTABLES_VERSION)
             ),
             new IndexTemplateConfig(
                 "profiling-ilm",
@@ -120,31 +134,36 @@ public class ProfilingIndexTemplateRegistry extends IndexTemplateRegistry {
                 "profiling-metrics",
                 "/org/elasticsearch/xpack/profiler/component-template/profiling-metrics.json",
                 INDEX_TEMPLATE_VERSION,
-                PROFILING_TEMPLATE_VERSION_VARIABLE
+                PROFILING_TEMPLATE_VERSION_VARIABLE,
+                indexVersion("metrics", PROFILING_METRICS_VERSION)
             ),
             new IndexTemplateConfig(
                 "profiling-hosts",
                 "/org/elasticsearch/xpack/profiler/component-template/profiling-hosts.json",
                 INDEX_TEMPLATE_VERSION,
-                PROFILING_TEMPLATE_VERSION_VARIABLE
+                PROFILING_TEMPLATE_VERSION_VARIABLE,
+                indexVersion("hosts", PROFILING_HOSTS_VERSION)
             ),
             new IndexTemplateConfig(
                 "profiling-stackframes",
                 "/org/elasticsearch/xpack/profiler/component-template/profiling-stackframes.json",
                 INDEX_TEMPLATE_VERSION,
-                PROFILING_TEMPLATE_VERSION_VARIABLE
+                PROFILING_TEMPLATE_VERSION_VARIABLE,
+                indexVersion("stackframes", PROFILING_STACKFRAMES_VERSION)
             ),
             new IndexTemplateConfig(
                 "profiling-stacktraces",
                 "/org/elasticsearch/xpack/profiler/component-template/profiling-stacktraces.json",
                 INDEX_TEMPLATE_VERSION,
-                PROFILING_TEMPLATE_VERSION_VARIABLE
+                PROFILING_TEMPLATE_VERSION_VARIABLE,
+                indexVersion("stacktraces", PROFILING_STACKTRACES_VERSION)
             ),
             new IndexTemplateConfig(
                 "profiling-symbols",
                 "/org/elasticsearch/xpack/profiler/component-template/profiling-symbols.json",
                 INDEX_TEMPLATE_VERSION,
-                PROFILING_TEMPLATE_VERSION_VARIABLE
+                PROFILING_TEMPLATE_VERSION_VARIABLE,
+                indexVersion("symbols", PROFILING_SYMBOLS_VERSION)
             )
         )) {
             try {
@@ -159,6 +178,10 @@ public class ProfilingIndexTemplateRegistry extends IndexTemplateRegistry {
         COMPONENT_TEMPLATE_CONFIGS = Collections.unmodifiableMap(componentTemplates);
     }
 
+    private static Map<String, String> indexVersion(String index, int version) {
+        return Map.of(String.format(Locale.ROOT, "xpack.profiling.index.%s.version", index), String.valueOf(version));
+    }
+
     @Override
     protected Map<String, ComponentTemplate> getComponentTemplateConfigs() {
         return templatesEnabled ? COMPONENT_TEMPLATE_CONFIGS : Collections.emptyMap();
@@ -206,19 +229,22 @@ public class ProfilingIndexTemplateRegistry extends IndexTemplateRegistry {
             "profiling-returnpads-private",
             "/org/elasticsearch/xpack/profiler/index-template/profiling-returnpads-private.json",
             INDEX_TEMPLATE_VERSION,
-            PROFILING_TEMPLATE_VERSION_VARIABLE
+            PROFILING_TEMPLATE_VERSION_VARIABLE,
+            indexVersion("returnpads.private", PROFILING_RETURNPADS_PRIVATE_VERSION)
         ),
         new IndexTemplateConfig(
             "profiling-sq-executables",
             "/org/elasticsearch/xpack/profiler/index-template/profiling-sq-executables.json",
             INDEX_TEMPLATE_VERSION,
-            PROFILING_TEMPLATE_VERSION_VARIABLE
+            PROFILING_TEMPLATE_VERSION_VARIABLE,
+            indexVersion("sq.executables", PROFILING_SQ_EXECUTABLES_VERSION)
         ),
         new IndexTemplateConfig(
             "profiling-sq-leafframes",
             "/org/elasticsearch/xpack/profiler/index-template/profiling-sq-leafframes.json",
             INDEX_TEMPLATE_VERSION,
-            PROFILING_TEMPLATE_VERSION_VARIABLE
+            PROFILING_TEMPLATE_VERSION_VARIABLE,
+            indexVersion("sq.leafframes", PROFILING_SQ_LEAFFRAMES_VERSION)
         ),
         new IndexTemplateConfig(
             "profiling-symbols-global",

+ 6 - 1
x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingPlugin.java

@@ -62,6 +62,7 @@ public class ProfilingPlugin extends Plugin implements ActionPlugin {
     private final SetOnce<ProfilingIndexTemplateRegistry> registry = new SetOnce<>();
 
     private final SetOnce<ProfilingIndexManager> indexManager = new SetOnce<>();
+    private final SetOnce<ProfilingDataStreamManager> dataStreamManager = new SetOnce<>();
 
     public ProfilingPlugin(Settings settings) {
         this.settings = settings;
@@ -88,13 +89,15 @@ public class ProfilingPlugin extends Plugin implements ActionPlugin {
         logger.info("Profiling is {}", enabled ? "enabled" : "disabled");
         registry.set(new ProfilingIndexTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry));
         indexManager.set(new ProfilingIndexManager(threadPool, client, clusterService));
+        dataStreamManager.set(new ProfilingDataStreamManager(threadPool, client, clusterService));
         // set initial value
         updateTemplatesEnabled(PROFILING_TEMPLATES_ENABLED.get(settings));
         clusterService.getClusterSettings().addSettingsUpdateConsumer(PROFILING_TEMPLATES_ENABLED, this::updateTemplatesEnabled);
         if (enabled) {
             registry.get().initialize();
             indexManager.get().initialize();
-            return List.of(registry.get(), indexManager.get());
+            dataStreamManager.get().initialize();
+            return List.of(registry.get(), indexManager.get(), dataStreamManager.get());
         } else {
             return Collections.emptyList();
         }
@@ -106,6 +109,7 @@ public class ProfilingPlugin extends Plugin implements ActionPlugin {
         }
         registry.get().setTemplatesEnabled(newValue);
         indexManager.get().setTemplatesEnabled(newValue);
+        dataStreamManager.get().setTemplatesEnabled(newValue);
     }
 
     @Override
@@ -162,5 +166,6 @@ public class ProfilingPlugin extends Plugin implements ActionPlugin {
     public void close() {
         registry.get().close();
         indexManager.get().close();
+        dataStreamManager.get().close();
     }
 }

+ 354 - 0
x-pack/plugin/profiler/src/test/java/org/elasticsearch/xpack/profiler/ProfilingDataStreamManagerTests.java

@@ -0,0 +1,354 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.profiler;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.datastreams.CreateDataStreamAction;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlocks;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexMode;
+import org.elasticsearch.index.IndexVersion;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.test.ClusterServiceUtils;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class ProfilingDataStreamManagerTests extends ESTestCase {
+    private final AtomicBoolean templatesCreated = new AtomicBoolean();
+    private ProfilingDataStreamManager datastreamManager;
+    private ClusterService clusterService;
+    private ThreadPool threadPool;
+    private VerifyingClient client;
+
+    @Before
+    public void createRegistryAndClient() {
+        templatesCreated.set(false);
+        threadPool = new TestThreadPool(this.getClass().getName());
+        client = new VerifyingClient(threadPool);
+        clusterService = ClusterServiceUtils.createClusterService(threadPool);
+        datastreamManager = new ProfilingDataStreamManager(threadPool, client, clusterService) {
+            @Override
+            protected boolean isAllResourcesCreated(ClusterChangedEvent event) {
+                return templatesCreated.get();
+            }
+        };
+        datastreamManager.setTemplatesEnabled(true);
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        threadPool.shutdownNow();
+    }
+
+    public void testThatMissingMasterNodeDoesNothing() {
+        DiscoveryNode localNode = DiscoveryNodeUtils.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").add(localNode).build();
+
+        client.setVerifier((a, r, l) -> {
+            fail("if the master is missing nothing should happen");
+            return null;
+        });
+
+        ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), nodes);
+        datastreamManager.clusterChanged(event);
+    }
+
+    public void testThatMissingTemplatesDoesNothing() {
+        DiscoveryNode node = DiscoveryNodeUtils.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+
+        client.setVerifier((a, r, l) -> {
+            fail("if any templates are missing nothing should happen");
+            return null;
+        });
+
+        ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), nodes);
+        datastreamManager.clusterChanged(event);
+    }
+
+    public void testThatNonExistingDataStreamsAreAddedImmediately() throws Exception {
+        DiscoveryNode node = DiscoveryNodeUtils.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+        templatesCreated.set(true);
+
+        ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), nodes);
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+
+        client.setVerifier((action, request, listener) -> verifyDataStreamInstalled(calledTimes, action, request, listener));
+        datastreamManager.clusterChanged(event);
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(ProfilingDataStreamManager.PROFILING_DATASTREAMS.size())));
+
+        calledTimes.set(0);
+    }
+
+    public void testThatRedIndexIsNotTouched() throws Exception {
+        DiscoveryNode node = DiscoveryNodeUtils.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+        templatesCreated.set(true);
+
+        // This data stream is a rollover candidate
+        ProfilingDataStreamManager.ProfilingDataStream existingDataStream = randomFrom(ProfilingDataStreamManager.PROFILING_DATASTREAMS);
+        ClusterChangedEvent event = createClusterChangedEvent(
+            List.of(existingDataStream.withVersion(0)),
+            nodes,
+            IndexMetadata.State.OPEN,
+            false
+        );
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+
+        client.setVerifier((action, request, listener) -> verifyDataStreamInstalled(calledTimes, action, request, listener));
+        datastreamManager.clusterChanged(event);
+        // should not create the index because a newer generation with the correct version exists
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(ProfilingDataStreamManager.PROFILING_DATASTREAMS.size() - 1)));
+
+        calledTimes.set(0);
+    }
+
+    public void testThatClosedIndexIsNotTouched() throws Exception {
+        DiscoveryNode node = DiscoveryNodeUtils.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+        templatesCreated.set(true);
+
+        // This index is an upgrade candidate
+        ProfilingDataStreamManager.ProfilingDataStream existingDataStream = randomFrom(ProfilingDataStreamManager.PROFILING_DATASTREAMS);
+        ClusterChangedEvent event = createClusterChangedEvent(
+            List.of(existingDataStream.withVersion(0)),
+            nodes,
+            IndexMetadata.State.CLOSE,
+            true
+        );
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+
+        client.setVerifier((action, request, listener) -> verifyDataStreamInstalled(calledTimes, action, request, listener));
+        datastreamManager.clusterChanged(event);
+        // should not create the index because a newer generation with the correct version exists
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(ProfilingDataStreamManager.PROFILING_DATASTREAMS.size() - 1)));
+
+        calledTimes.set(0);
+    }
+
+    public void testThatExistingIndicesAreNotCreatedTwice() throws Exception {
+        DiscoveryNode node = DiscoveryNodeUtils.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+        templatesCreated.set(true);
+
+        ProfilingDataStreamManager.ProfilingDataStream existingDataStream = randomFrom(ProfilingDataStreamManager.PROFILING_DATASTREAMS);
+        ClusterChangedEvent event = createClusterChangedEvent(List.of(existingDataStream), nodes);
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+
+        client.setVerifier((action, request, listener) -> verifyDataStreamInstalled(calledTimes, action, request, listener));
+        datastreamManager.clusterChanged(event);
+        // should not create the existing index
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(ProfilingDataStreamManager.PROFILING_DATASTREAMS.size() - 1)));
+
+        calledTimes.set(0);
+    }
+
+    public void testThatDataStreamIsRolledOver() throws Exception {
+        DiscoveryNode node = DiscoveryNodeUtils.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+        templatesCreated.set(true);
+
+        ProfilingDataStreamManager.ProfilingDataStream dataStreamToRollover = randomFrom(ProfilingDataStreamManager.PROFILING_DATASTREAMS);
+        List<ProfilingDataStreamManager.ProfilingDataStream> existingDataStreams = new ArrayList<>(
+            ProfilingDataStreamManager.PROFILING_DATASTREAMS
+        );
+        existingDataStreams.remove(dataStreamToRollover);
+        existingDataStreams.add(dataStreamToRollover.withVersion(0));
+
+        ClusterChangedEvent event = createClusterChangedEvent(existingDataStreams, nodes);
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+
+        client.setVerifier((action, request, listener) -> verifyDataStreamRolledOver(calledTimes, action, request, listener));
+        datastreamManager.clusterChanged(event);
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
+
+        calledTimes.set(0);
+    }
+
+    private ActionResponse verifyDataStreamInstalled(
+        AtomicInteger calledTimes,
+        ActionType<?> action,
+        ActionRequest request,
+        ActionListener<?> listener
+    ) {
+        if (action instanceof CreateDataStreamAction) {
+            calledTimes.incrementAndGet();
+            assertThat(action, instanceOf(CreateDataStreamAction.class));
+            assertThat(request, instanceOf(CreateDataStreamAction.Request.class));
+            assertNotNull(listener);
+            return AcknowledgedResponse.TRUE;
+        } else {
+            fail("client called with unexpected request:" + request.toString());
+            return null;
+        }
+    }
+
+    private ActionResponse verifyDataStreamRolledOver(
+        AtomicInteger calledTimes,
+        ActionType<?> action,
+        ActionRequest request,
+        ActionListener<?> listener
+    ) {
+        if (action instanceof RolloverAction) {
+            calledTimes.incrementAndGet();
+            assertThat(action, instanceOf(RolloverAction.class));
+            assertThat(request, instanceOf(RolloverRequest.class));
+            assertNotNull(listener);
+            return AcknowledgedResponse.TRUE;
+        } else {
+            fail("client called with unexpected request:" + request.toString());
+            return null;
+        }
+    }
+
+    private ClusterChangedEvent createClusterChangedEvent(
+        Iterable<ProfilingDataStreamManager.ProfilingDataStream> existingDataStreams,
+        DiscoveryNodes nodes
+    ) {
+        return createClusterChangedEvent(existingDataStreams, nodes, IndexMetadata.State.OPEN, true);
+    }
+
+    private ClusterChangedEvent createClusterChangedEvent(
+        Iterable<ProfilingDataStreamManager.ProfilingDataStream> existingDataStreams,
+        DiscoveryNodes nodes,
+        IndexMetadata.State state,
+        boolean allShardsAssigned
+    ) {
+        ClusterState cs = createClusterState(Settings.EMPTY, existingDataStreams, nodes, state, allShardsAssigned);
+        ClusterChangedEvent realEvent = new ClusterChangedEvent(
+            "created-from-test",
+            cs,
+            ClusterState.builder(new ClusterName("test")).build()
+        );
+        ClusterChangedEvent event = spy(realEvent);
+        when(event.localNodeMaster()).thenReturn(nodes.isLocalNodeElectedMaster());
+
+        return event;
+    }
+
+    private ClusterState createClusterState(
+        Settings nodeSettings,
+        Iterable<ProfilingDataStreamManager.ProfilingDataStream> existingDataStreams,
+        DiscoveryNodes nodes,
+        IndexMetadata.State state,
+        boolean allShardsAssigned
+    ) {
+        Metadata.Builder metadataBuilder = Metadata.builder();
+        RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
+        Map<String, IndexMetadata> indices = new HashMap<>();
+        for (ProfilingDataStreamManager.ProfilingDataStream existingDataStream : existingDataStreams) {
+            String writeIndexName = String.format(Locale.ROOT, ".ds-%s", existingDataStream.getName());
+            Index writeIndex = new Index(writeIndexName, writeIndexName);
+            DataStream ds = new DataStream(
+                existingDataStream.getName(),
+                List.of(writeIndex),
+                1,
+                Map.of(),
+                false,
+                false,
+                false,
+                false,
+                IndexMode.STANDARD
+            );
+            metadataBuilder.put(ds);
+            IndexMetadata.Builder builder = new IndexMetadata.Builder(writeIndexName);
+            builder.state(state);
+            builder.settings(indexSettings(IndexVersion.current(), 1, 1).put(IndexMetadata.SETTING_INDEX_UUID, writeIndex.getUUID()));
+            builder.putMapping(
+                new MappingMetadata(
+                    MapperService.SINGLE_MAPPING_NAME,
+                    Map.of(
+                        "_meta",
+                        Map.of(
+                            "index-version",
+                            existingDataStream.getVersion(),
+                            "index-template-version",
+                            ProfilingIndexTemplateRegistry.INDEX_TEMPLATE_VERSION
+                        )
+                    )
+                )
+            );
+            builder.numberOfReplicas(0);
+            builder.numberOfShards(1);
+            IndexMetadata indexMetadata = builder.build();
+
+            indices.put(writeIndexName, indexMetadata);
+            ShardRouting shardRouting = ShardRouting.newUnassigned(
+                new ShardId(writeIndex, 0),
+                true,
+                RecoverySource.ExistingStoreRecoverySource.INSTANCE,
+                new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""),
+                ShardRouting.Role.DEFAULT
+            );
+            if (allShardsAssigned) {
+                shardRouting = shardRouting.initialize("node0", null, 0).moveToStarted(0);
+            }
+            routingTableBuilder.add(
+                IndexRoutingTable.builder(writeIndex)
+                    .addIndexShard(IndexShardRoutingTable.builder(shardRouting.shardId()).addShard(shardRouting))
+            );
+        }
+
+        return ClusterState.builder(new ClusterName("test"))
+            .metadata(metadataBuilder.indices(indices).transientSettings(nodeSettings).build())
+            .blocks(new ClusterBlocks.Builder().build())
+            .nodes(nodes)
+            .routingTable(routingTableBuilder)
+            .build();
+    }
+}

+ 181 - 10
x-pack/plugin/profiler/src/test/java/org/elasticsearch/xpack/profiler/ProfilingIndexManagerTests.java

@@ -14,19 +14,31 @@ import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexVersion;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ClusterServiceUtils;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
@@ -34,6 +46,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.After;
 import org.junit.Before;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -43,7 +56,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
@@ -118,12 +130,60 @@ public class ProfilingIndexManagerTests extends ESTestCase {
         calledTimes.set(0);
     }
 
+    public void testThatRedIndexIsNotTouched() throws Exception {
+        DiscoveryNode node = DiscoveryNodeUtils.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+        templatesCreated.set(true);
+
+        // This index is an upgrade candidate
+        ProfilingIndexManager.ProfilingIndex existingIndex = randomFrom(ProfilingIndexManager.PROFILING_INDICES);
+        ClusterChangedEvent event = createClusterChangedEvent(
+            List.of(existingIndex.withVersion(0)),
+            nodes,
+            IndexMetadata.State.OPEN,
+            false
+        );
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+
+        client.setVerifier((action, request, listener) -> verifyIndexInstalled(calledTimes, action, request, listener));
+        indexManager.clusterChanged(event);
+        // should not create the index because a newer generation with the correct version exists
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(ProfilingIndexManager.PROFILING_INDICES.size() - 1)));
+
+        calledTimes.set(0);
+    }
+
+    public void testThatClosedIndexIsNotTouched() throws Exception {
+        DiscoveryNode node = DiscoveryNodeUtils.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+        templatesCreated.set(true);
+
+        // This index is an upgrade candidate
+        ProfilingIndexManager.ProfilingIndex existingIndex = randomFrom(ProfilingIndexManager.PROFILING_INDICES);
+        ClusterChangedEvent event = createClusterChangedEvent(
+            List.of(existingIndex.withVersion(0)),
+            nodes,
+            IndexMetadata.State.CLOSE,
+            true
+        );
+
+        AtomicInteger calledTimes = new AtomicInteger(0);
+
+        client.setVerifier((action, request, listener) -> verifyIndexInstalled(calledTimes, action, request, listener));
+        indexManager.clusterChanged(event);
+        // should not create the index because a newer generation with the correct version exists
+        assertBusy(() -> assertThat(calledTimes.get(), equalTo(ProfilingIndexManager.PROFILING_INDICES.size() - 1)));
+
+        calledTimes.set(0);
+    }
+
     public void testThatExistingIndicesAreNotCreatedTwice() throws Exception {
         DiscoveryNode node = DiscoveryNodeUtils.create("node");
         DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
         templatesCreated.set(true);
 
-        String existingIndex = randomFrom(ProfilingIndexManager.PROFILING_INDICES).toString();
+        ProfilingIndexManager.ProfilingIndex existingIndex = randomFrom(ProfilingIndexManager.PROFILING_INDICES);
         ClusterChangedEvent event = createClusterChangedEvent(List.of(existingIndex), nodes);
 
         AtomicInteger calledTimes = new AtomicInteger(0);
@@ -136,6 +196,37 @@ public class ProfilingIndexManagerTests extends ESTestCase {
         calledTimes.set(0);
     }
 
+    public void testUpgradesOldIndex() throws Exception {
+        DiscoveryNode node = DiscoveryNodeUtils.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+        templatesCreated.set(true);
+
+        ProfilingIndexManager.ProfilingIndex indexWithDeleteOnVersionBump = randomFrom(
+            ProfilingIndexManager.PROFILING_INDICES.stream()
+                .filter(p -> p.getOnVersionBump().equals(ProfilingIndexManager.OnVersionBump.DELETE_OLD))
+                .toList()
+        );
+        ProfilingIndexManager.ProfilingIndex oldIndex = indexWithDeleteOnVersionBump.withVersion(0);
+        List<ProfilingIndexManager.ProfilingIndex> existingIndices = new ArrayList<>(ProfilingIndexManager.PROFILING_INDICES);
+        // only the old index must exist
+        existingIndices.remove(indexWithDeleteOnVersionBump);
+        existingIndices.add(oldIndex);
+
+        ClusterChangedEvent event = createClusterChangedEvent(existingIndices, nodes);
+
+        AtomicInteger indicesCreated = new AtomicInteger(0);
+        AtomicInteger indicesDeleted = new AtomicInteger(0);
+
+        client.setVerifier((action, request, listener) -> verifyIndexUpgraded(indicesCreated, indicesDeleted, action, request, listener));
+        indexManager.clusterChanged(event);
+        // should delete one old index and create a new one
+        assertBusy(() -> assertThat(indicesCreated.get(), equalTo(1)));
+        assertBusy(() -> assertThat(indicesDeleted.get(), equalTo(1)));
+
+        indicesCreated.set(0);
+        indicesDeleted.set(0);
+    }
+
     private ActionResponse verifyIndexInstalled(
         AtomicInteger calledTimes,
         ActionType<?> action,
@@ -154,8 +245,45 @@ public class ProfilingIndexManagerTests extends ESTestCase {
         }
     }
 
-    private ClusterChangedEvent createClusterChangedEvent(Iterable<String> existingIndices, DiscoveryNodes nodes) {
-        ClusterState cs = createClusterState(Settings.EMPTY, existingIndices, nodes);
+    private ActionResponse verifyIndexUpgraded(
+        AtomicInteger indicesCreated,
+        AtomicInteger indicesDeleted,
+        ActionType<?> action,
+        ActionRequest request,
+        ActionListener<?> listener
+    ) {
+        if (action instanceof CreateIndexAction) {
+            indicesCreated.incrementAndGet();
+            assertThat(action, instanceOf(CreateIndexAction.class));
+            assertThat(request, instanceOf(CreateIndexRequest.class));
+            assertNotNull(listener);
+            return new CreateIndexResponse(true, true, ((CreateIndexRequest) request).index());
+        } else if (action instanceof DeleteIndexAction) {
+            indicesDeleted.incrementAndGet();
+            assertThat(action, instanceOf(DeleteIndexAction.class));
+            assertThat(request, instanceOf(DeleteIndexRequest.class));
+            assertNotNull(listener);
+            return AcknowledgedResponse.TRUE;
+        } else {
+            fail("client called with unexpected request:" + request.toString());
+            return null;
+        }
+    }
+
+    private ClusterChangedEvent createClusterChangedEvent(
+        Iterable<ProfilingIndexManager.ProfilingIndex> existingIndices,
+        DiscoveryNodes nodes
+    ) {
+        return createClusterChangedEvent(existingIndices, nodes, IndexMetadata.State.OPEN, true);
+    }
+
+    private ClusterChangedEvent createClusterChangedEvent(
+        Iterable<ProfilingIndexManager.ProfilingIndex> existingIndices,
+        DiscoveryNodes nodes,
+        IndexMetadata.State state,
+        boolean allShardsAssigned
+    ) {
+        ClusterState cs = createClusterState(Settings.EMPTY, existingIndices, nodes, state, allShardsAssigned);
         ClusterChangedEvent realEvent = new ClusterChangedEvent(
             "created-from-test",
             cs,
@@ -167,18 +295,61 @@ public class ProfilingIndexManagerTests extends ESTestCase {
         return event;
     }
 
-    private ClusterState createClusterState(Settings nodeSettings, Iterable<String> existingIndices, DiscoveryNodes nodes) {
+    private ClusterState createClusterState(
+        Settings nodeSettings,
+        Iterable<ProfilingIndexManager.ProfilingIndex> existingIndices,
+        DiscoveryNodes nodes,
+        IndexMetadata.State state,
+        boolean allShardsAssigned
+    ) {
+        RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
         Map<String, IndexMetadata> indices = new HashMap<>();
-        for (String index : existingIndices) {
-            IndexMetadata mockMetadata = mock(IndexMetadata.class);
-            when(mockMetadata.getIndex()).thenReturn(new Index(index, index));
-            when(mockMetadata.getCompatibilityVersion()).thenReturn(IndexVersion.current());
-            indices.put(index, mockMetadata);
+        for (ProfilingIndexManager.ProfilingIndex profilingIndex : existingIndices) {
+            String indexName = profilingIndex.getName();
+            Index index = new Index(indexName, indexName);
+            IndexMetadata.Builder builder = new IndexMetadata.Builder(indexName);
+            builder.state(state);
+            builder.settings(indexSettings(IndexVersion.current(), 1, 1).put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()));
+            builder.putMapping(
+                new MappingMetadata(
+                    MapperService.SINGLE_MAPPING_NAME,
+                    Map.of(
+                        "_meta",
+                        Map.of(
+                            "index-version",
+                            profilingIndex.getVersion(),
+                            "index-template-version",
+                            ProfilingIndexTemplateRegistry.INDEX_TEMPLATE_VERSION
+                        )
+                    )
+                )
+            );
+            builder.numberOfReplicas(0);
+            builder.numberOfShards(1);
+            IndexMetadata indexMetadata = builder.build();
+
+            indices.put(indexName, indexMetadata);
+            ShardRouting shardRouting = ShardRouting.newUnassigned(
+                new ShardId(index, 0),
+                true,
+                RecoverySource.ExistingStoreRecoverySource.INSTANCE,
+                new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""),
+                ShardRouting.Role.DEFAULT
+            );
+            if (allShardsAssigned) {
+                shardRouting = shardRouting.initialize("node0", null, 0).moveToStarted(0);
+            }
+            routingTableBuilder.add(
+                IndexRoutingTable.builder(index)
+                    .addIndexShard(IndexShardRoutingTable.builder(shardRouting.shardId()).addShard(shardRouting))
+            );
         }
+
         return ClusterState.builder(new ClusterName("test"))
             .metadata(Metadata.builder().indices(indices).transientSettings(nodeSettings).build())
             .blocks(new ClusterBlocks.Builder().build())
             .nodes(nodes)
+            .routingTable(routingTableBuilder)
             .build();
     }
 }

+ 3 - 0
x-pack/plugin/vector-tile/src/main/java/org/elasticsearch/xpack/vectortile/rest/RestVectorTileAction.java

@@ -28,6 +28,8 @@ import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.rest.Scope;
+import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.rest.action.RestResponseListener;
 import org.elasticsearch.search.SearchHit;
@@ -62,6 +64,7 @@ import static org.elasticsearch.transport.RemoteClusterAware.buildRemoteIndexNam
 /**
  * Main class handling a call to the _mvt API.
  */
+@ServerlessScope(Scope.PUBLIC)
 public class RestVectorTileAction extends BaseRestHandler {
 
     private static final String META_LAYER = "meta";