Browse Source

Allow regular data streams to be migrated to tsdb data streams. (#83843)

A regular data stream can be migrated to a tsdb data stream if in template that created the data stream, the `index_mode` field is set to `time_series` and the data stream's `index_mode` property is either not specified or set to `standard`. Then on the next rollover the data stream is migrated to be a tsdb data stream.

When that happens the data stream's `index_mode` property is set to `time_series` and the new backing index's `index.mode` index setting is also set to `time_series`.

Closes #83520
Martijn van Groningen 3 years ago
parent
commit
ae7defa9f9

+ 131 - 0
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.datastreams;
 
 import org.elasticsearch.client.Request;
+import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.common.time.DateFormatter;
 import org.elasticsearch.common.time.FormatNames;
 import org.elasticsearch.test.rest.ESRestTestCase;
@@ -15,16 +16,19 @@ import org.elasticsearch.test.rest.yaml.ObjectPath;
 
 import java.io.IOException;
 import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.Map;
 
 import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
 import static org.hamcrest.Matchers.aMapWithSize;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 public class TsdbDataStreamRestIT extends ESRestTestCase {
 
@@ -84,6 +88,57 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
             }
         }""";
 
+    private static final String NON_TSDB_TEMPLATE = """
+        {
+            "index_patterns": ["k8s*"],
+            "template": {
+                "settings":{
+                    "index": {
+                        "number_of_replicas": 0,
+                        "number_of_shards": 2
+                    }
+                },
+                "mappings":{
+                    "properties": {
+                        "@timestamp" : {
+                            "type": "date"
+                        },
+                        "metricset": {
+                            "type": "keyword"
+                        },
+                        "k8s": {
+                            "properties": {
+                                "pod": {
+                                    "properties": {
+                                        "uid": {
+                                            "type": "keyword"
+                                        },
+                                        "name": {
+                                            "type": "keyword"
+                                        },
+                                        "ip": {
+                                            "type": "ip"
+                                        },
+                                        "network": {
+                                            "properties": {
+                                                "tx": {
+                                                    "type": "long"
+                                                },
+                                                "rx": {
+                                                    "type": "long"
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            },
+            "data_stream": {}
+        }""";
+
     private static final String DOC = """
         {
             "@timestamp": "$time",
@@ -235,6 +290,82 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
         }
     }
 
+    public void testMigrateRegularDataStreamToTsdbDataStream() throws Exception {
+        // Create a non tsdb template
+        var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
+        putComposableIndexTemplateRequest.setJsonEntity(NON_TSDB_TEMPLATE);
+        assertOK(client().performRequest(putComposableIndexTemplateRequest));
+
+        // Index a few docs and sometimes rollover
+        int numRollovers = 4;
+        int numDocs = 32;
+        var currentTime = Instant.now();
+        var currentMinus30Days = currentTime.minus(30, ChronoUnit.DAYS);
+        for (int i = 0; i < numRollovers; i++) {
+            for (int j = 0; j < numDocs; j++) {
+                var indexRequest = new Request("POST", "/k8s/_doc");
+                var time = Instant.ofEpochMilli(randomLongBetween(currentMinus30Days.toEpochMilli(), currentTime.toEpochMilli()));
+                indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
+                var response = client().performRequest(indexRequest);
+                assertOK(response);
+                var responseBody = entityAsMap(response);
+                // i rollovers and +1 offset:
+                assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", i + 1));
+            }
+            var rolloverRequest = new Request("POST", "/k8s/_rollover");
+            var rolloverResponse = client().performRequest(rolloverRequest);
+            assertOK(rolloverResponse);
+            var rolloverResponseBody = entityAsMap(rolloverResponse);
+            assertThat(rolloverResponseBody.get("rolled_over"), is(true));
+        }
+
+        var getDataStreamsRequest = new Request("GET", "/_data_stream");
+        var getDataStreamResponse = client().performRequest(getDataStreamsRequest);
+        assertOK(getDataStreamResponse);
+        var dataStreams = entityAsMap(getDataStreamResponse);
+        assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
+        assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(5));
+        for (int i = 0; i < 5; i++) {
+            String backingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices." + i + ".index_name");
+            assertThat(backingIndex, backingIndexEqualTo("k8s", i + 1));
+            var indices = getIndex(backingIndex);
+            var escapedBackingIndex = backingIndex.replace(".", "\\.");
+            assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
+            assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), nullValue());
+            assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"), nullValue());
+            assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"), nullValue());
+        }
+
+        // Update template
+        putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
+        putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE);
+        assertOK(client().performRequest(putComposableIndexTemplateRequest));
+
+        var rolloverRequest = new Request("POST", "/k8s/_rollover");
+        var rolloverResponse = client().performRequest(rolloverRequest);
+        assertOK(rolloverResponse);
+        var rolloverResponseBody = entityAsMap(rolloverResponse);
+        assertThat(rolloverResponseBody.get("rolled_over"), is(true));
+        var newIndex = (String) rolloverResponseBody.get("new_index");
+        assertThat(newIndex, backingIndexEqualTo("k8s", 6));
+
+        // Ingest documents that will land in the new tsdb backing index:
+        for (int i = 0; i < numDocs; i++) {
+            var indexRequest = new Request("POST", "/k8s/_doc");
+            indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(currentTime)));
+            var response = client().performRequest(indexRequest);
+            assertOK(response);
+            var responseBody = entityAsMap(response);
+            assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", 6));
+        }
+
+        // Fail if documents target older non tsdb backing index:
+        var indexRequest = new Request("POST", "/k8s/_doc");
+        indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(currentMinus30Days)));
+        var e = expectThrows(ResponseException.class, () -> client().performRequest(indexRequest));
+        assertThat(e.getMessage(), containsString("is outside of ranges of currently writable indices"));
+    }
+
     private static Map<?, ?> getIndex(String indexName) throws IOException {
         var getIndexRequest = new Request("GET", "/" + indexName + "?human");
         var response = client().performRequest(getIndexRequest);

+ 10 - 2
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java

@@ -36,8 +36,16 @@ public class DataStreamIndexSettingsProvider implements IndexSettingProvider {
     ) {
         if (dataStreamName != null) {
             DataStream dataStream = metadata.dataStreams().get(dataStreamName);
+            // First backing index is created and then data stream is rolled over (in a single cluster state update).
+            // So at this point we can't check index_mode==time_series,
+            // so checking that index_mode==null|standard and templateIndexMode == TIME_SERIES
+            boolean migrating = dataStream != null
+                && (dataStream.getIndexMode() == null || dataStream.getIndexMode() == IndexMode.STANDARD)
+                && templateIndexMode == IndexMode.TIME_SERIES;
             IndexMode indexMode;
-            if (dataStream != null) {
+            if (migrating) {
+                indexMode = IndexMode.TIME_SERIES;
+            } else if (dataStream != null) {
                 indexMode = dataStream.getIndexMode();
             } else {
                 indexMode = templateIndexMode;
@@ -50,7 +58,7 @@ public class DataStreamIndexSettingsProvider implements IndexSettingProvider {
                     TimeValue lookAheadTime = IndexSettings.LOOK_AHEAD_TIME.get(allSettings);
                     final Instant start;
                     final Instant end;
-                    if (dataStream == null) {
+                    if (dataStream == null || migrating) {
                         start = resolvedAt.minusMillis(lookAheadTime.getMillis());
                         end = resolvedAt.plusMillis(lookAheadTime.getMillis());
                     } else {

+ 182 - 1
modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java

@@ -44,6 +44,7 @@ import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.notNullValue;
 
 public class MetadataDataStreamRolloverServiceTests extends ESTestCase {
 
@@ -63,7 +64,7 @@ public class MetadataDataStreamRolloverServiceTests extends ESTestCase {
             IndexMode.TIME_SERIES
         );
         ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStream.getName() + "*"))
-            .template(new Template(Settings.builder().put("index.mode", "time_series").build(), null, null))
+            .template(new Template(Settings.builder().put("index.routing_path", "uid").build(), null, null))
             .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, IndexMode.TIME_SERIES))
             .build();
         Metadata.Builder builder = Metadata.builder();
@@ -75,6 +76,7 @@ public class MetadataDataStreamRolloverServiceTests extends ESTestCase {
                         .put("index.hidden", true)
                         .put(SETTING_INDEX_UUID, dataStream.getWriteIndex().getUUID())
                         .put("index.mode", "time_series")
+                        .put("index.routing_path", "uid")
                         .put("index.time_series.start_time", FORMATTER.format(now.minus(4, ChronoUnit.HOURS)))
                         .put("index.time_series.end_time", FORMATTER.format(now.minus(2, ChronoUnit.HOURS)))
                 )
@@ -144,4 +146,183 @@ public class MetadataDataStreamRolloverServiceTests extends ESTestCase {
         }
     }
 
+    public void testRolloverAndMigrateDataStream() throws Exception {
+        Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
+        String dataStreamName = "logs-my-app";
+        IndexMode dsIndexMode = randomBoolean() ? null : IndexMode.STANDARD;
+        final DataStream dataStream = new DataStream(
+            dataStreamName,
+            new DataStream.TimestampField("@timestamp"),
+            List.of(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1, now.toEpochMilli()), "uuid")),
+            1,
+            null,
+            false,
+            false,
+            false,
+            false,
+            dsIndexMode
+        );
+        ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStream.getName() + "*"))
+            .template(new Template(Settings.builder().put("index.routing_path", "uid").build(), null, null))
+            .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, IndexMode.TIME_SERIES))
+            .build();
+        Metadata.Builder builder = Metadata.builder();
+        builder.put("template", template);
+        Settings.Builder indexSettings = ESTestCase.settings(Version.CURRENT)
+            .put("index.hidden", true)
+            .put(SETTING_INDEX_UUID, dataStream.getWriteIndex().getUUID());
+        if (dsIndexMode != null) {
+            indexSettings.put("index.mode", dsIndexMode.getName());
+        }
+        builder.put(
+            IndexMetadata.builder(dataStream.getWriteIndex().getName()).settings(indexSettings).numberOfShards(1).numberOfReplicas(0)
+        );
+        builder.put(dataStream);
+        final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();
+
+        ThreadPool testThreadPool = new TestThreadPool(getTestName());
+        try {
+            MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService(
+                dataStream,
+                testThreadPool,
+                Set.of(new DataStreamIndexSettingsProvider()),
+                xContentRegistry()
+            );
+            MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
+            List<Condition<?>> metConditions = Collections.singletonList(condition);
+            CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
+
+            MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(
+                clusterState,
+                dataStream.getName(),
+                null,
+                createIndexRequest,
+                metConditions,
+                now,
+                randomBoolean(),
+                false
+            );
+
+            String sourceIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration());
+            String newIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);
+            assertEquals(sourceIndexName, rolloverResult.sourceIndexName());
+            assertEquals(newIndexName, rolloverResult.rolloverIndexName());
+            Metadata rolloverMetadata = rolloverResult.clusterState().metadata();
+            assertEquals(dataStream.getIndices().size() + 1, rolloverMetadata.indices().size());
+
+            // Assert data stream's index_mode has been changed to time_series.
+            assertThat(rolloverMetadata.dataStreams().get(dataStreamName), notNullValue());
+            assertThat(rolloverMetadata.dataStreams().get(dataStreamName).getIndexMode(), equalTo(IndexMode.TIME_SERIES));
+
+            // Nothing changed for the original backing index:
+            IndexMetadata im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(0));
+            assertThat(IndexSettings.MODE.get(im.getSettings()), equalTo(IndexMode.STANDARD));
+            assertThat(IndexSettings.TIME_SERIES_START_TIME.exists(im.getSettings()), is(false));
+            assertThat(IndexSettings.TIME_SERIES_END_TIME.exists(im.getSettings()), is(false));
+            // New backing index is a tsdb index:
+            im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(1));
+            assertThat(IndexSettings.MODE.get(im.getSettings()), equalTo(IndexMode.TIME_SERIES));
+            Instant startTime = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
+            Instant endTime = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
+            assertThat(startTime.isBefore(endTime), is(true));
+            assertThat(startTime, equalTo(now.minus(2, ChronoUnit.HOURS)));
+            assertThat(endTime, equalTo(now.plus(2, ChronoUnit.HOURS)));
+        } finally {
+            testThreadPool.shutdown();
+        }
+    }
+
+    public void testChangingIndexModeFromTimeSeriesToSomethingElseNoEffectOnExistingDataStreams() throws Exception {
+        Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
+        String dataStreamName = "logs-my-app";
+        final DataStream dataStream = new DataStream(
+            dataStreamName,
+            new DataStream.TimestampField("@timestamp"),
+            List.of(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1, now.toEpochMilli()), "uuid")),
+            1,
+            null,
+            false,
+            false,
+            false,
+            false,
+            IndexMode.TIME_SERIES
+        );
+        ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStream.getName() + "*"))
+            .template(new Template(Settings.builder().put("index.routing_path", "uid").build(), null, null))
+            .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, randomBoolean() ? IndexMode.STANDARD : null))
+            .build();
+        Metadata.Builder builder = Metadata.builder();
+        builder.put("template", template);
+        builder.put(
+            IndexMetadata.builder(dataStream.getWriteIndex().getName())
+                .settings(
+                    ESTestCase.settings(Version.CURRENT)
+                        .put("index.hidden", true)
+                        .put(SETTING_INDEX_UUID, dataStream.getWriteIndex().getUUID())
+                        .put("index.mode", "time_series")
+                        .put("index.routing_path", "uid")
+                        .put("index.time_series.start_time", FORMATTER.format(now.minus(4, ChronoUnit.HOURS)))
+                        .put("index.time_series.end_time", FORMATTER.format(now.minus(2, ChronoUnit.HOURS)))
+                )
+                .numberOfShards(1)
+                .numberOfReplicas(0)
+        );
+        builder.put(dataStream);
+        final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();
+
+        ThreadPool testThreadPool = new TestThreadPool(getTestName());
+        try {
+            MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService(
+                dataStream,
+                testThreadPool,
+                Set.of(new DataStreamIndexSettingsProvider()),
+                xContentRegistry()
+            );
+            MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
+            List<Condition<?>> metConditions = Collections.singletonList(condition);
+            CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
+
+            MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(
+                clusterState,
+                dataStream.getName(),
+                null,
+                createIndexRequest,
+                metConditions,
+                now,
+                randomBoolean(),
+                false
+            );
+
+            String sourceIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration());
+            String newIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);
+            assertEquals(sourceIndexName, rolloverResult.sourceIndexName());
+            assertEquals(newIndexName, rolloverResult.rolloverIndexName());
+            Metadata rolloverMetadata = rolloverResult.clusterState().metadata();
+            assertEquals(dataStream.getIndices().size() + 1, rolloverMetadata.indices().size());
+
+            // Assert data stream's index_mode remains time_series.
+            assertThat(rolloverMetadata.dataStreams().get(dataStreamName), notNullValue());
+            assertThat(rolloverMetadata.dataStreams().get(dataStreamName).getIndexMode(), equalTo(IndexMode.TIME_SERIES));
+
+            // Nothing changed for the original tsdb backing index:
+            IndexMetadata im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(0));
+            assertThat(IndexSettings.MODE.exists(im.getSettings()), is(true));
+            Instant startTime = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
+            Instant endTime = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
+            assertThat(startTime.isBefore(endTime), is(true));
+            assertThat(startTime, equalTo(now.minus(4, ChronoUnit.HOURS)));
+            assertThat(endTime, equalTo(now.minus(2, ChronoUnit.HOURS)));
+            // New backing index is also a tsdb index:
+            im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(1));
+            assertThat(IndexSettings.MODE.get(im.getSettings()), equalTo(IndexMode.TIME_SERIES));
+            startTime = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
+            endTime = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
+            assertThat(startTime.isBefore(endTime), is(true));
+            assertThat(startTime, equalTo(now.minus(2, ChronoUnit.HOURS)));
+            assertThat(endTime, equalTo(now.plus(2, ChronoUnit.HOURS)));
+        } finally {
+            testThreadPool.shutdown();
+        }
+    }
+
 }

+ 3 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

@@ -278,7 +278,9 @@ public class MetadataRolloverService {
             currentState,
             createIndexClusterStateRequest,
             silent,
-            (builder, indexMetadata) -> builder.put(ds.rollover(indexMetadata.getIndex(), newGeneration))
+            (builder, indexMetadata) -> builder.put(
+                ds.rollover(indexMetadata.getIndex(), newGeneration, templateV2.getDataStreamTemplate().getIndexMode())
+            )
         );
 
         RolloverInfo rolloverInfo = new RolloverInfo(dataStreamName, metConditions, threadPool.absoluteTimeInMillis());

+ 35 - 14
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -171,8 +171,14 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             Index index = indices.get(i);
             IndexMetadata im = metadata.index(index);
 
-            // TODO: make start and end time fields in IndexMetadata class.
+            // TODO: make index_mode, start and end time fields in IndexMetadata class.
             // (this to avoid the overhead that occurs when reading a setting)
+            if (IndexSettings.MODE.get(im.getSettings()) != IndexMode.TIME_SERIES) {
+                // Not a tsdb backing index, so skip.
+                // (This can happen is this is a migrated tsdb data stream)
+                continue;
+            }
+
             Instant start = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
             Instant end = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
             // Check should be in sync with DataStreamTimestampFieldMapper#validateTimestamp(...) method
@@ -192,12 +198,19 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     public void validate(Function<String, IndexMetadata> imSupplier) {
         if (indexMode == IndexMode.TIME_SERIES) {
             // Get a sorted overview of each backing index with there start and end time range:
-            var startAndEndTimes = indices.stream().map(index -> imSupplier.apply(index.getName())).map(im -> {
-                Instant start = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
-                Instant end = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
-                assert end.isAfter(start); // This is also validated by TIME_SERIES_END_TIME setting.
-                return new Tuple<>(im.getIndex().getName(), new Tuple<>(start, end));
-            })
+            var startAndEndTimes = indices.stream()
+                .map(index -> imSupplier.apply(index.getName()))
+                .filter(
+                    // Migrated tsdb data streams have non tsdb backing indices:
+                    im -> IndexSettings.TIME_SERIES_START_TIME.exists(im.getSettings())
+                        && IndexSettings.TIME_SERIES_END_TIME.exists(im.getSettings())
+                )
+                .map(im -> {
+                    Instant start = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
+                    Instant end = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
+                    assert end.isAfter(start); // This is also validated by TIME_SERIES_END_TIME setting.
+                    return new Tuple<>(im.getIndex().getName(), new Tuple<>(start, end));
+                })
                 .sorted(Comparator.comparing(entry -> entry.v2().v1())) // Sort by start time
                 .collect(Collectors.toList());
 
@@ -265,21 +278,29 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      * Performs a rollover on a {@code DataStream} instance and returns a new instance containing
      * the updated list of backing indices and incremented generation.
      *
-     * @param writeIndex new write index
-     * @param generation new generation
+     * @param writeIndex            new write index
+     * @param generation            new generation
+     * @param indexModeFromTemplate the index mode as is defined in the template that created this data stream
      *
      * @return new {@code DataStream} instance with the rollover operation applied
      */
-    public DataStream rollover(Index writeIndex, long generation) {
+    public DataStream rollover(Index writeIndex, long generation, IndexMode indexModeFromTemplate) {
         ensureNotReplicated();
 
-        return unsafeRollover(writeIndex, generation);
+        return unsafeRollover(writeIndex, generation, indexModeFromTemplate);
     }
 
     /**
-     * Like {@link #rollover(Index, long)}, but does no validation, use with care only.
+     * Like {@link #rollover(Index, long, IndexMode)}, but does no validation, use with care only.
      */
-    public DataStream unsafeRollover(Index writeIndex, long generation) {
+    public DataStream unsafeRollover(Index writeIndex, long generation, IndexMode indexModeFromTemplate) {
+        IndexMode indexMode = this.indexMode;
+        // This allows for migrating a data stream to be a tsdb data stream:
+        // (only if index_mode=null|standard then allow it to be set to time_series)
+        if ((indexMode == null || indexMode == IndexMode.STANDARD) && indexModeFromTemplate == IndexMode.TIME_SERIES) {
+            indexMode = IndexMode.TIME_SERIES;
+        }
+
         List<Index> backingIndices = new ArrayList<>(indices);
         backingIndices.add(writeIndex);
         return new DataStream(
@@ -298,7 +319,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
 
     /**
      * Performs a dummy rollover on a {@code DataStream} instance and returns the tuple of the next write index name and next generation
-     * that this {@code DataStream} should roll over to using {@link #rollover(Index, long)}.
+     * that this {@code DataStream} should roll over to using {@link #rollover(Index, long, IndexMode)}.
      *
      * @param clusterMetadata Cluster metadata
      *

+ 59 - 2
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.test.AbstractSerializingTestCase;
@@ -60,7 +61,7 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
     public void testRollover() {
         DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
         Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);
-        final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2());
+        final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null);
         assertThat(rolledDs.getName(), equalTo(ds.getName()));
         assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField()));
         assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
@@ -86,13 +87,69 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
         }
 
         final Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(builder.build());
-        final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2());
+        final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null);
         assertThat(rolledDs.getName(), equalTo(ds.getName()));
         assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField()));
         assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + numConflictingIndices + 1));
         assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
         assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
         assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex()));
+        assertThat(rolledDs.getIndexMode(), equalTo(ds.getIndexMode()));
+    }
+
+    public void testRolloverIndexMode() {
+        IndexMode indexMode = randomBoolean() ? IndexMode.STANDARD : null;
+        DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
+        // Unsure index_mode=null
+        ds = new DataStream(
+            ds.getName(),
+            ds.getTimeStampField(),
+            ds.getIndices(),
+            ds.getGeneration(),
+            ds.getMetadata(),
+            ds.isHidden(),
+            ds.isReplicated(),
+            ds.isSystem(),
+            ds.isAllowCustomRouting(),
+            indexMode
+        );
+        var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);
+
+        var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), IndexMode.TIME_SERIES);
+        assertThat(rolledDs.getName(), equalTo(ds.getName()));
+        assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField()));
+        assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
+        assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
+        assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
+        assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex()));
+        assertThat(rolledDs.getIndexMode(), equalTo(IndexMode.TIME_SERIES));
+    }
+
+    public void testRolloverIndexMode_keepIndexMode() {
+        DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
+        ds = new DataStream(
+            ds.getName(),
+            ds.getTimeStampField(),
+            ds.getIndices(),
+            ds.getGeneration(),
+            ds.getMetadata(),
+            ds.isHidden(),
+            ds.isReplicated(),
+            ds.isSystem(),
+            ds.isAllowCustomRouting(),
+            IndexMode.TIME_SERIES
+        );
+        var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);
+
+        IndexMode indexMode = randomBoolean() ? IndexMode.STANDARD : null;
+        var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), indexMode);
+        assertThat(rolledDs.getName(), equalTo(ds.getName()));
+        assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField()));
+        assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
+        assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
+        assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
+        assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex()));
+        assertThat(rolledDs.getIndexMode(), equalTo(IndexMode.TIME_SERIES));
     }
 
     public void testRemoveBackingIndex() {

+ 4 - 0
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -289,6 +289,10 @@ public final class DataStreamTestHelper {
         boolean replicated
     ) {
         Metadata.Builder builder = Metadata.builder();
+        builder.put(
+            "template_1",
+            new ComposableIndexTemplate(List.of("*"), null, null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate())
+        );
 
         List<IndexMetadata> allIndices = new ArrayList<>();
         for (Tuple<String, Integer> dsTuple : dataStreams) {

+ 1 - 1
x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java

@@ -558,7 +558,7 @@ public class ReactiveStorageDeciderService implements AutoscalingDeciderService
             for (int i = 0; i < numberNewIndices; ++i) {
                 final String uuid = UUIDs.randomBase64UUID();
                 final Tuple<String, Long> rolledDataStreamInfo = dataStream.unsafeNextWriteIndexAndGeneration(state.metadata());
-                dataStream = dataStream.unsafeRollover(new Index(rolledDataStreamInfo.v1(), uuid), rolledDataStreamInfo.v2());
+                dataStream = dataStream.unsafeRollover(new Index(rolledDataStreamInfo.v1(), uuid), rolledDataStreamInfo.v2(), null);
 
                 // this unintentionally copies the in-sync allocation ids too. This has the fortunate effect of these indices
                 // not being regarded new by the disk threshold decider, thereby respecting the low watermark threshold even for primaries.