Browse Source

Allow a data stream to be downgraded to a regular data stream (#92644)

Upon rollover a data stream's index mode is removed if the associated template no longer has index.mode setting set to time_series.

Closes to #92544
Martijn van Groningen 2 years ago
parent
commit
cfda43e775

+ 70 - 11
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java

@@ -468,10 +468,10 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
         assertThat(e.getMessage(), containsString("is outside of ranges of currently writable indices"));
     }
 
-    public void testChangeTemplateIndexMode() throws Exception {
+    public void testDowngradeTsdbDataStreamToRegularDataStream() throws Exception {
+        var time = Instant.now();
         {
             var indexRequest = new Request("POST", "/k8s/_doc");
-            var time = Instant.now();
             indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
             var response = client().performRequest(indexRequest);
             assertOK(response);
@@ -479,15 +479,74 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
         {
             var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
             putComposableIndexTemplateRequest.setJsonEntity(NON_TSDB_TEMPLATE);
-            var e = expectThrows(ResponseException.class, () -> client().performRequest(putComposableIndexTemplateRequest));
-            assertThat(
-                e.getMessage(),
-                containsString(
-                    "composable template [1] with index patterns [k8s*], priority [null],"
-                        + " index.routing_path [] would cause tsdb data streams [k8s] to no longer match a data stream template"
-                        + " with a time_series index_mode"
-                )
-            );
+            client().performRequest(putComposableIndexTemplateRequest);
+        }
+        {
+            {
+                // check prior to rollover
+                var getDataStreamsRequest = new Request("GET", "/_data_stream");
+                var getDataStreamResponse = client().performRequest(getDataStreamsRequest);
+                assertOK(getDataStreamResponse);
+                var dataStreams = entityAsMap(getDataStreamResponse);
+                assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
+                assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
+                assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.time_series"), notNullValue());
+            }
+            var rolloverRequest = new Request("POST", "/k8s/_rollover");
+            var rolloverResponse = client().performRequest(rolloverRequest);
+            assertOK(rolloverResponse);
+            var rolloverResponseBody = entityAsMap(rolloverResponse);
+            assertThat(rolloverResponseBody.get("rolled_over"), is(true));
+            {
+                // Data stream is no longer a tsdb data stream
+                var getDataStreamsRequest = new Request("GET", "/_data_stream");
+                var getDataStreamResponse = client().performRequest(getDataStreamsRequest);
+                assertOK(getDataStreamResponse);
+                var dataStreams = entityAsMap(getDataStreamResponse);
+                assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
+                assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(2));
+                assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.time_series"), nullValue());
+            }
+            {
+                // old index remains a tsdb index
+                var oldIndex = (String) rolloverResponseBody.get("old_index");
+                assertThat(oldIndex, backingIndexEqualTo("k8s", 1));
+                var indices = getIndex(oldIndex);
+                var escapedBackingIndex = oldIndex.replace(".", "\\.");
+                assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
+                assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series"));
+                assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"), notNullValue());
+                assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"), notNullValue());
+            }
+            {
+                // new index is a regular index
+                var newIndex = (String) rolloverResponseBody.get("new_index");
+                assertThat(newIndex, backingIndexEqualTo("k8s", 2));
+                var indices = getIndex(newIndex);
+                var escapedBackingIndex = newIndex.replace(".", "\\.");
+                assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
+                assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), nullValue());
+                assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"), nullValue());
+                assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"), nullValue());
+            }
+        }
+        {
+            // All documents should be ingested into the most recent backing index:
+            // (since the data stream is no longer a tsdb data stream)
+            Instant[] timestamps = new Instant[] {
+                time,
+                time.plusSeconds(1),
+                time.plusSeconds(5),
+                time.minus(30, ChronoUnit.DAYS),
+                time.plus(30, ChronoUnit.DAYS) };
+            for (Instant timestamp : timestamps) {
+                var indexRequest = new Request("POST", "/k8s/_doc");
+                indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(timestamp)));
+                var response = client().performRequest(indexRequest);
+                assertOK(response);
+                var responseBody = entityAsMap(response);
+                assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", 2));
+            }
         }
     }
 

+ 1 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java

@@ -74,7 +74,7 @@ public class DataStreamIndexSettingsProvider implements IndexSettingProvider {
             if (migrating) {
                 indexMode = IndexMode.TIME_SERIES;
             } else if (dataStream != null) {
-                indexMode = dataStream.getIndexMode();
+                indexMode = timeSeries ? dataStream.getIndexMode() : null;
             } else if (timeSeries) {
                 indexMode = IndexMode.TIME_SERIES;
             } else {

+ 56 - 3
modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java

@@ -26,7 +26,10 @@ import java.io.IOException;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.List;
+import java.util.Map;
 
+import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createFirstBackingIndex;
+import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance;
 import static org.elasticsearch.common.settings.Settings.builder;
 import static org.elasticsearch.datastreams.DataStreamIndexSettingsProvider.FORMATTER;
 import static org.hamcrest.Matchers.contains;
@@ -35,6 +38,8 @@ import static org.hamcrest.Matchers.equalTo;
 
 public class DataStreamIndexSettingsProviderTests extends ESTestCase {
 
+    private static final TimeValue DEFAULT_LOOK_AHEAD_TIME = TimeValue.timeValueHours(2); // default
+
     DataStreamIndexSettingsProvider provider;
 
     @Before
@@ -49,7 +54,6 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
         String dataStreamName = "logs-app1";
 
         Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
-        TimeValue lookAheadTime = TimeValue.timeValueHours(2); // default
         Settings settings = Settings.EMPTY;
         String mapping = """
             {
@@ -79,8 +83,8 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
             List.of(new CompressedXContent(mapping))
         );
         assertThat(result.size(), equalTo(3));
-        assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookAheadTime.getMillis())));
-        assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(lookAheadTime.getMillis())));
+        assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
+        assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
         assertThat(IndexMetadata.INDEX_ROUTING_PATH.get(result), contains("field3"));
     }
 
@@ -335,6 +339,55 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
         assertThat(result.size(), equalTo(0));
     }
 
+    public void testGetAdditionalIndexSettingsMigrateToTsdb() {
+        Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
+        String dataStreamName = "logs-app1";
+        IndexMetadata idx = createFirstBackingIndex(dataStreamName).build();
+        DataStream existingDataStream = newInstance(dataStreamName, List.of(idx.getIndex()));
+        Metadata metadata = Metadata.builder().dataStreams(Map.of(dataStreamName, existingDataStream), Map.of()).build();
+
+        Settings settings = Settings.EMPTY;
+        Settings result = provider.getAdditionalIndexSettings(
+            DataStream.getDefaultBackingIndexName(dataStreamName, 2),
+            dataStreamName,
+            true,
+            metadata,
+            now,
+            settings,
+            List.of()
+        );
+        assertThat(result.size(), equalTo(2));
+        assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
+        assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
+    }
+
+    public void testGetAdditionalIndexSettingsDowngradeFromTsdb() {
+        String dataStreamName = "logs-app1";
+        Instant twoHoursAgo = Instant.now().minus(4, ChronoUnit.HOURS).truncatedTo(ChronoUnit.MILLIS);
+        Metadata.Builder mb = Metadata.builder(
+            DataStreamTestHelper.getClusterStateWithDataStreams(
+                List.of(Tuple.tuple(dataStreamName, 1)),
+                List.of(),
+                twoHoursAgo.toEpochMilli(),
+                builder().build(),
+                1
+            ).getMetadata()
+        );
+        Metadata metadata = mb.build();
+
+        Settings settings = Settings.EMPTY;
+        Settings result = provider.getAdditionalIndexSettings(
+            DataStream.getDefaultBackingIndexName(dataStreamName, 2),
+            dataStreamName,
+            false,
+            metadata,
+            Instant.ofEpochMilli(1L),
+            settings,
+            List.of()
+        );
+        assertThat(result.size(), equalTo(0));
+    }
+
     public void testGenerateRoutingPathFromDynamicTemplate() throws Exception {
         Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
         TimeValue lookAheadTime = TimeValue.timeValueHours(2); // default

+ 0 - 60
modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataIndexTemplateServiceTests.java

@@ -11,8 +11,6 @@ package org.elasticsearch.datastreams;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.ComponentTemplate;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
-import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
-import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
 import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
 import org.elasticsearch.cluster.metadata.Template;
@@ -21,7 +19,6 @@ import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.Tuple;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.index.IndexSettingProviders;
 import org.elasticsearch.indices.EmptySystemIndices;
@@ -30,8 +27,6 @@ import org.elasticsearch.indices.InvalidIndexTemplateException;
 import org.elasticsearch.indices.ShardLimitValidator;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -50,61 +45,6 @@ import static org.mockito.Mockito.when;
  */
 public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
 
-    public void testValidateTsdbDataStreamsReferringTsdbTemplate() throws Exception {
-        var state = ClusterState.EMPTY_STATE;
-        final var service = getMetadataIndexTemplateService();
-        var template = new ComposableIndexTemplate(
-            Collections.singletonList("logs-*-*"),
-            new Template(
-                builder().put("index.mode", "time_series").put("index.routing_path", "uid").build(),
-                new CompressedXContent(generateTsdbMapping()),
-                null
-            ),
-            null,
-            100L,
-            null,
-            null,
-            new ComposableIndexTemplate.DataStreamTemplate(false, false),
-            null
-        );
-        state = service.addIndexTemplateV2(state, false, "logs", template);
-
-        var now = Instant.now();
-        var mBuilder = Metadata.builder(state.getMetadata());
-        DataStreamTestHelper.getClusterStateWithDataStream(
-            mBuilder,
-            "unreferenced",
-            List.of(Tuple.tuple(now.minus(2, ChronoUnit.HOURS), now))
-        );
-        DataStreamTestHelper.getClusterStateWithDataStream(
-            mBuilder,
-            "logs-mysql-default",
-            List.of(Tuple.tuple(now.minus(2, ChronoUnit.HOURS), now))
-        );
-        var stateWithDS = ClusterState.builder(state).metadata(mBuilder).build();
-
-        var e = expectThrows(IllegalArgumentException.class, () -> {
-            ComposableIndexTemplate nonDSTemplate = new ComposableIndexTemplate(
-                Collections.singletonList("logs-*-*"),
-                null,
-                null,
-                100L,
-                null,
-                null,
-                new ComposableIndexTemplate.DataStreamTemplate(false, false),
-                null
-            );
-            service.addIndexTemplateV2(stateWithDS, false, "logs", nonDSTemplate);
-        });
-
-        assertThat(
-            e.getMessage(),
-            containsString(
-                "would cause tsdb data streams [logs-mysql-default] to no longer match a data stream template with a time_series index_mode"
-            )
-        );
-    }
-
     public void testRequireRoutingPath() throws Exception {
         final var service = getMetadataIndexTemplateService();
         {

+ 5 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -294,10 +294,13 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      */
     public DataStream unsafeRollover(Index writeIndex, long generation, boolean timeSeries) {
         IndexMode indexMode = this.indexMode;
-        // This allows for migrating a data stream to be a tsdb data stream:
-        // (only if index_mode=null|standard then allow it to be set to time_series)
         if ((indexMode == null || indexMode == IndexMode.STANDARD) && timeSeries) {
+            // This allows for migrating a data stream to be a tsdb data stream:
+            // (only if index_mode=null|standard then allow it to be set to time_series)
             indexMode = IndexMode.TIME_SERIES;
+        } else if (indexMode == IndexMode.TIME_SERIES && timeSeries == false) {
+            // Allow downgrading a time series data stream to a regular data stream
+            indexMode = null;
         }
 
         List<Index> backingIndices = new ArrayList<>(indices);

+ 0 - 52
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

@@ -38,7 +38,6 @@ import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
-import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettingProvider;
 import org.elasticsearch.index.IndexSettingProviders;
@@ -685,7 +684,6 @@ public class MetadataIndexTemplateService {
 
         validate(name, templateToValidate);
         validateDataStreamsStillReferenced(currentState, name, templateToValidate);
-        validateTsdbDataStreamsReferringTsdbTemplate(currentState, name, templateToValidate);
 
         // Finally, right before adding the template, we need to ensure that the composite settings,
         // mappings, and aliases are valid after it's been composed with the component templates
@@ -758,56 +756,6 @@ public class MetadataIndexTemplateService {
         }
     }
 
-    // This method should be invoked after validateDataStreamsStillReferenced(...)
-    private static void validateTsdbDataStreamsReferringTsdbTemplate(
-        ClusterState state,
-        String templateName,
-        ComposableIndexTemplate newTemplate
-    ) {
-        Metadata currentMetadata = state.getMetadata();
-        Metadata updatedMetadata = null;
-        Set<String> dataStreamsWithNonTsdbTemplate = null;
-
-        for (var dataStream : state.metadata().dataStreams().values()) {
-            if (dataStream.getIndexMode() != IndexMode.TIME_SERIES) {
-                continue;
-            }
-
-            if (updatedMetadata == null) {
-                updatedMetadata = Metadata.builder(state.metadata()).put(templateName, newTemplate).build();
-            }
-            var matchingTemplate = findV2Template(updatedMetadata, dataStream.getName(), false);
-            if (templateName.equals(matchingTemplate)) {
-                if (currentMetadata.isTimeSeriesTemplate(newTemplate) == false) {
-                    if (dataStreamsWithNonTsdbTemplate == null) {
-                        dataStreamsWithNonTsdbTemplate = new HashSet<>();
-                    }
-                    dataStreamsWithNonTsdbTemplate.add(dataStream.getName());
-                }
-            }
-        }
-
-        if (dataStreamsWithNonTsdbTemplate != null) {
-            var settings = MetadataIndexTemplateService.resolveSettings(newTemplate, currentMetadata.componentTemplates());
-            var routingPaths = IndexMetadata.INDEX_ROUTING_PATH.get(settings);
-            throw new IllegalArgumentException(
-                "composable template ["
-                    + templateName
-                    + "] with index patterns "
-                    + newTemplate.indexPatterns()
-                    + ", priority ["
-                    + newTemplate.priority()
-                    + "]"
-                    + ", index.routing_path "
-                    + routingPaths
-                    + " "
-                    + "would cause tsdb data streams "
-                    + dataStreamsWithNonTsdbTemplate
-                    + " to no longer match a data stream template with a time_series index_mode"
-            );
-        }
-    }
-
     /**
      * Return a map of v1 template names to their index patterns for v1 templates that would overlap
      * with the given v2 template's index patterns.

+ 4 - 3
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -39,6 +39,7 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStream> {
 
@@ -96,7 +97,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         assertThat(rolledDs.getIndexMode(), equalTo(ds.getIndexMode()));
     }
 
-    public void testRolloverIndexMode() {
+    public void testRolloverUpgradeToTsdbDataStream() {
         IndexMode indexMode = randomBoolean() ? IndexMode.STANDARD : null;
         DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
         // Unsure index_mode=null
@@ -123,7 +124,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         assertThat(rolledDs.getIndexMode(), equalTo(IndexMode.TIME_SERIES));
     }
 
-    public void testRolloverIndexMode_keepIndexMode() {
+    public void testRolloverDowngradeToRegularDataStream() {
         DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
         ds = new DataStream(
             ds.getName(),
@@ -145,7 +146,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
         assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
         assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex()));
-        assertThat(rolledDs.getIndexMode(), equalTo(IndexMode.TIME_SERIES));
+        assertThat(rolledDs.getIndexMode(), nullValue());
     }
 
     public void testRemoveBackingIndex() {