Преглед изворни кода

Add component that updates the index.time_series.end_time (#82798)

Added a component that updates the index.time_series.end_time
index setting of the most recent backing index of a time series data stream.

This allows a time series data stream to accept data when
the data stream hasn't rolled over yet, but its end_time
is get close to current time.

This component computes a new end_time based on current time,
the configured look_ahead_time and the poll interval. If
this new end time is larger than the set end_time then this
new value overwrites the old value.

The component runs periodically in the background and bulk
updates the end_time for the most recent backing index for
all time series data streams.

Relates to #74660
Martijn van Groningen пре 3 година
родитељ
комит
a953813e48

+ 183 - 0
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java

@@ -0,0 +1,183 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.datastreams;
+
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
+import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.time.DateFormatter;
+import org.elasticsearch.common.time.FormatNames;
+import org.elasticsearch.index.IndexMode;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ESSingleNodeTestCase;
+import org.elasticsearch.xcontent.XContentType;
+import org.junit.After;
+
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+
+public class TSDBIndexingIT extends ESSingleNodeTestCase {
+
+    private static final String DOC = """
+        {
+            "@timestamp": "$time",
+            "metricset": "pod",
+            "k8s": {
+                "pod": {
+                    "name": "dog",
+                    "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9",
+                    "ip": "10.10.55.3",
+                    "network": {
+                        "tx": 1434595272,
+                        "rx": 530605511
+                    }
+                }
+            }
+        }
+        """;
+
+    @Override
+    protected Collection<Class<? extends Plugin>> getPlugins() {
+        return List.of(DataStreamsPlugin.class);
+    }
+
+    @Override
+    protected Settings nodeSettings() {
+        Settings.Builder newSettings = Settings.builder();
+        newSettings.put(super.nodeSettings());
+        // This essentially disables the automatic updates to end_time settings of a data stream's latest backing index.
+        newSettings.put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "10m");
+        return newSettings.build();
+    }
+
+    @After
+    public void cleanup() {
+        DeleteDataStreamAction.Request deleteDataStreamsRequest = new DeleteDataStreamAction.Request("*");
+        assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, deleteDataStreamsRequest).actionGet());
+    }
+
+    public void testTimeRanges() throws Exception {
+        var mappingTemplate = """
+            {
+              "_doc":{
+                "properties": {
+                  "metricset": {
+                    "type": "keyword",
+                    "time_series_dimension": true
+                  }
+                }
+              }
+            }""";
+        Settings templateSettings = Settings.builder().put("index.routing_path", "metricset").build();
+        var request = new PutComposableIndexTemplateAction.Request("id");
+        request.indexTemplate(
+            new ComposableIndexTemplate(
+                List.of("k8s*"),
+                new Template(templateSettings, new CompressedXContent(mappingTemplate), null),
+                null,
+                null,
+                null,
+                null,
+                new ComposableIndexTemplate.DataStreamTemplate(false, false, IndexMode.TIME_SERIES),
+                null
+            )
+        );
+        client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
+
+        // index doc
+        Instant time = Instant.now();
+        String backingIndexName;
+        {
+            var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
+            indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
+            var indexResponse = client().index(indexRequest).actionGet();
+            backingIndexName = indexResponse.getIndex();
+        }
+
+        // fetch start and end time
+        var getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndexName)).actionGet();
+        Instant startTime = IndexSettings.TIME_SERIES_START_TIME.get(getIndexResponse.getSettings().get(backingIndexName));
+        Instant endTime = IndexSettings.TIME_SERIES_END_TIME.get(getIndexResponse.getSettings().get(backingIndexName));
+
+        // index another doc and verify index
+        {
+            var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
+            indexRequest.source(DOC.replace("$time", formatInstant(endTime.minusSeconds(1))), XContentType.JSON);
+            var indexResponse = client().index(indexRequest).actionGet();
+            assertThat(indexResponse.getIndex(), equalTo(backingIndexName));
+        }
+
+        // index doc beyond range and check failure
+        {
+            var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
+            time = randomBoolean() ? endTime : endTime.plusSeconds(randomIntBetween(1, 99));
+            indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
+            expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet());
+        }
+
+        // Fetch UpdateTimeSeriesRangeService and increment time range of latest backing index:
+        UpdateTimeSeriesRangeService updateTimeSeriesRangeService = getInstanceFromNode(UpdateTimeSeriesRangeService.class);
+        CountDownLatch latch = new CountDownLatch(1);
+        updateTimeSeriesRangeService.perform(latch::countDown);
+        latch.await();
+
+        // index again and check for success
+        {
+            var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
+            indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
+            var indexResponse = client().index(indexRequest).actionGet();
+            assertThat(indexResponse.getIndex(), equalTo(backingIndexName));
+        }
+
+        // rollover
+        var rolloverRequest = new RolloverRequest("k8s", null);
+        var rolloverResponse = client().admin().indices().rolloverIndex(rolloverRequest).actionGet();
+        var newBackingIndexName = rolloverResponse.getNewIndex();
+
+        // index and check target index is new
+        getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(newBackingIndexName)).actionGet();
+        Instant newStartTime = IndexSettings.TIME_SERIES_START_TIME.get(getIndexResponse.getSettings().get(newBackingIndexName));
+        Instant newEndTime = IndexSettings.TIME_SERIES_END_TIME.get(getIndexResponse.getSettings().get(newBackingIndexName));
+
+        // Check whether the document lands in the newest backing index:
+        time = Instant.ofEpochMilli(randomLongBetween(newStartTime.toEpochMilli(), newEndTime.toEpochMilli() - 1));
+        {
+            var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
+            indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
+            var indexResponse = client().index(indexRequest).actionGet();
+            assertThat(indexResponse.getIndex(), equalTo(newBackingIndexName));
+        }
+
+        // Double check indexing against previous backing index:
+        time = newStartTime.minusMillis(1);
+        {
+            var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
+            indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
+            var indexResponse = client().index(indexRequest).actionGet();
+            assertThat(indexResponse.getIndex(), equalTo(backingIndexName));
+        }
+    }
+
+    static String formatInstant(Instant instant) {
+        return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
+    }
+
+}

+ 11 - 4
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java

@@ -31,7 +31,7 @@ public class DataStreamIndexSettingsProvider implements IndexSettingProvider {
         String dataStreamName,
         IndexMode templateIndexMode,
         Metadata metadata,
-        long resolvedAt,
+        Instant resolvedAt,
         Settings allSettings
     ) {
         if (dataStreamName != null) {
@@ -48,9 +48,11 @@ public class DataStreamIndexSettingsProvider implements IndexSettingProvider {
 
                 if (indexMode == IndexMode.TIME_SERIES) {
                     TimeValue lookAheadTime = IndexSettings.LOOK_AHEAD_TIME.get(allSettings);
-                    Instant start;
+                    final Instant start;
+                    final Instant end;
                     if (dataStream == null) {
-                        start = Instant.ofEpochMilli(resolvedAt).minusMillis(lookAheadTime.getMillis());
+                        start = resolvedAt.minusMillis(lookAheadTime.getMillis());
+                        end = resolvedAt.plusMillis(lookAheadTime.getMillis());
                     } else {
                         IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex());
                         if (currentLatestBackingIndex.getSettings().hasValue(IndexSettings.TIME_SERIES_END_TIME.getKey()) == false) {
@@ -64,9 +66,14 @@ public class DataStreamIndexSettingsProvider implements IndexSettingProvider {
                             );
                         }
                         start = IndexSettings.TIME_SERIES_END_TIME.get(currentLatestBackingIndex.getSettings());
+                        if (start.isAfter(resolvedAt)) {
+                            end = start.plusMillis(lookAheadTime.getMillis());
+                        } else {
+                            end = resolvedAt.plusMillis(lookAheadTime.getMillis());
+                        }
                     }
+                    assert start.isBefore(end) : "data stream backing index's start time is not before end time";
                     builder.put(IndexSettings.TIME_SERIES_START_TIME.getKey(), FORMATTER.format(start));
-                    Instant end = Instant.ofEpochMilli(resolvedAt).plusMillis(lookAheadTime.getMillis());
                     builder.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), FORMATTER.format(end));
                 }
                 return builder.build();

+ 53 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

@@ -16,12 +16,17 @@ import org.elasticsearch.action.datastreams.GetDataStreamAction;
 import org.elasticsearch.action.datastreams.MigrateToDataStreamAction;
 import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
 import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
+import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.IndexScopedSettings;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.datastreams.action.CreateDataStreamTransportAction;
 import org.elasticsearch.datastreams.action.DataStreamsStatsTransportAction;
 import org.elasticsearch.datastreams.action.DeleteDataStreamTransportAction;
@@ -36,11 +41,19 @@ import org.elasticsearch.datastreams.rest.RestGetDataStreamsAction;
 import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction;
 import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction;
 import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.index.IndexSettingProvider;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.plugins.ActionPlugin;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestHandler;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.watcher.ResourceWatcherService;
+import org.elasticsearch.xcontent.NamedXContentRegistry;
 
 import java.util.Collection;
 import java.util.List;
@@ -48,6 +61,46 @@ import java.util.function.Supplier;
 
 public class DataStreamsPlugin extends Plugin implements ActionPlugin {
 
+    public static final Setting<TimeValue> TIME_SERIES_POLL_INTERVAL = Setting.timeSetting(
+        "time_series.poll_interval",
+        TimeValue.timeValueMinutes(5),
+        TimeValue.timeValueMinutes(1),
+        TimeValue.timeValueMinutes(10),
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+
+    @Override
+    public List<Setting<?>> getSettings() {
+        if (IndexSettings.isTimeSeriesModeEnabled() == false) {
+            return List.of();
+        }
+
+        return List.of(TIME_SERIES_POLL_INTERVAL);
+    }
+
+    @Override
+    public Collection<Object> createComponents(
+        Client client,
+        ClusterService clusterService,
+        ThreadPool threadPool,
+        ResourceWatcherService resourceWatcherService,
+        ScriptService scriptService,
+        NamedXContentRegistry xContentRegistry,
+        Environment environment,
+        NodeEnvironment nodeEnvironment,
+        NamedWriteableRegistry namedWriteableRegistry,
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        Supplier<RepositoriesService> repositoriesServiceSupplier
+    ) {
+        if (IndexSettings.isTimeSeriesModeEnabled() == false) {
+            return List.of();
+        }
+
+        var service = new UpdateTimeSeriesRangeService(environment.settings(), threadPool, clusterService);
+        return List.of(service);
+    }
+
     @Override
     public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
         var createDsAction = new ActionHandler<>(CreateDataStreamAction.INSTANCE, CreateDataStreamTransportAction.class);

+ 200 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java

@@ -0,0 +1,200 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.datastreams;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.LocalNodeMasterListener;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexMode;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.threadpool.Scheduler;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
+
+/**
+ * A component that updates the 'index.time_series.end_time' index setting for the most recently added backing index of a tsdb data stream.
+ */
+public class UpdateTimeSeriesRangeService extends AbstractLifecycleComponent implements LocalNodeMasterListener {
+
+    private static final Logger LOGGER = LogManager.getLogger(UpdateTimeSeriesRangeService.class);
+
+    private final ThreadPool threadPool;
+    private final ClusterService clusterService;
+
+    volatile TimeValue pollInterval;
+    volatile Scheduler.Cancellable job;
+    private final AtomicBoolean running = new AtomicBoolean(false);
+
+    UpdateTimeSeriesRangeService(Settings settings, ThreadPool threadPool, ClusterService clusterService) {
+        this.pollInterval = DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.get(settings);
+        this.threadPool = threadPool;
+        this.clusterService = clusterService;
+        clusterService.getClusterSettings().addSettingsUpdateConsumer(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL, this::setPollInterval);
+    }
+
+    void perform(Runnable onComplete) {
+        if (running.compareAndSet(false, true)) {
+            LOGGER.debug("starting tsdb update task");
+            clusterService.submitStateUpdateTask("update_tsdb_data_stream_end_times", new ClusterStateUpdateTask(Priority.URGENT) {
+                @Override
+                public ClusterState execute(ClusterState currentState) throws Exception {
+                    return updateTimeSeriesTemporalRange(currentState, Instant.now());
+                }
+
+                @Override
+                public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
+                    running.set(false);
+                    onComplete.run();
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    running.set(false);
+                    LOGGER.warn("failed to update tsdb data stream end times", e);
+                    onComplete.run();
+                }
+
+            }, ClusterStateTaskExecutor.unbatched());
+        } else {
+            LOGGER.debug("not starting tsdb update task, because another execution is still running");
+        }
+    }
+
+    void setPollInterval(TimeValue newValue) {
+        LOGGER.info(
+            "updating [{}] setting from [{}] to [{}]",
+            DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(),
+            pollInterval,
+            newValue
+        );
+        this.pollInterval = newValue;
+
+        // Only re-schedule if we've been scheduled, this should only be the case on elected master node.
+        if (job != null) {
+            unschedule();
+            scheduleTask();
+        }
+    }
+
+    ClusterState updateTimeSeriesTemporalRange(ClusterState current, Instant now) {
+        Metadata.Builder mBuilder = null;
+        for (DataStream dataStream : current.metadata().dataStreams().values()) {
+            if (dataStream.getIndexMode() != IndexMode.TIME_SERIES) {
+                continue;
+            }
+            if (dataStream.isReplicated()) {
+                continue;
+            }
+
+            // getWriteIndex() selects the latest added index:
+            Index head = dataStream.getWriteIndex();
+            IndexMetadata im = current.metadata().getIndexSafe(head);
+            Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
+            TimeValue lookAheadTime = IndexSettings.LOOK_AHEAD_TIME.get(im.getSettings());
+            Instant newEnd = now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS);
+            if (newEnd.isAfter(currentEnd)) {
+                try {
+                    Settings settings = Settings.builder()
+                        .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DEFAULT_DATE_TIME_FORMATTER.format(newEnd))
+                        .build();
+                    LOGGER.debug(
+                        "updating [{}] setting from [{}] to [{}] for data stream [{}]",
+                        IndexSettings.TIME_SERIES_END_TIME.getKey(),
+                        currentEnd,
+                        newEnd,
+                        dataStream.getName()
+                    );
+                    if (mBuilder == null) {
+                        mBuilder = Metadata.builder(current.metadata());
+                    }
+                    mBuilder.updateSettings(settings, head.getName());
+                    // Verify that all temporal ranges of each backing index is still valid:
+                    dataStream.validate(mBuilder::get);
+                } catch (Exception e) {
+                    LOGGER.error(
+                        () -> new ParameterizedMessage(
+                            "unable to update [{}] for data stream [{}] and backing index [{}]",
+                            IndexSettings.TIME_SERIES_END_TIME.getKey(),
+                            dataStream.getName(),
+                            head.getName()
+                        ),
+                        e
+                    );
+                }
+            }
+        }
+
+        if (mBuilder != null) {
+            return ClusterState.builder(current).metadata(mBuilder).build();
+        } else {
+            return current;
+        }
+    }
+
+    void scheduleTask() {
+        if (job == null) {
+            LOGGER.debug("schedule tsdb update task");
+            job = threadPool.scheduleWithFixedDelay(
+                () -> perform(() -> LOGGER.debug("completed tsdb update task")),
+                pollInterval,
+                ThreadPool.Names.SAME
+            );
+        }
+    }
+
+    void unschedule() {
+        if (job != null) {
+            job.cancel();
+            job = null;
+        }
+    }
+
+    @Override
+    protected void doStart() {
+        clusterService.addLocalNodeMasterListener(this);
+    }
+
+    @Override
+    protected void doStop() {
+        unschedule();
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        unschedule();
+    }
+
+    @Override
+    public void onMaster() {
+        scheduleTask();
+    }
+
+    @Override
+    public void offMaster() {
+        unschedule();
+    }
+}

+ 6 - 6
modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java

@@ -41,7 +41,7 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
             dataStreamName,
             IndexMode.TIME_SERIES,
             metadata,
-            now.toEpochMilli(),
+            now,
             settings
         );
         assertThat(result.size(), equalTo(3));
@@ -63,7 +63,7 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
             dataStreamName,
             IndexMode.TIME_SERIES,
             metadata,
-            now.toEpochMilli(),
+            now,
             settings
         );
         assertThat(result.size(), equalTo(3));
@@ -91,7 +91,7 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
             dataStreamName,
             IndexMode.TIME_SERIES,
             metadata,
-            now.toEpochMilli(),
+            now,
             settings
         );
         assertThat(result.size(), equalTo(3));
@@ -142,7 +142,7 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
                 dataStreamName,
                 IndexMode.TIME_SERIES,
                 metadata,
-                now.toEpochMilli(),
+                now,
                 settings
             )
         );
@@ -167,7 +167,7 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
             dataStreamName,
             null,
             metadata,
-            1L,
+            Instant.ofEpochMilli(1L),
             settings
         );
         assertThat(result.size(), equalTo(0));
@@ -184,7 +184,7 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
             dataStreamName,
             IndexMode.STANDARD,
             metadata,
-            1L,
+            Instant.ofEpochMilli(1L),
             settings
         );
         assertThat(result.size(), equalTo(1));

+ 225 - 0
modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java

@@ -0,0 +1,225 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.datastreams;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.Before;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalAmount;
+import java.util.List;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.in;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {
+
+    private ThreadPool threadPool;
+    private UpdateTimeSeriesRangeService instance;
+
+    @Before
+    public void createInstance() {
+        ClusterService mockClusterService = mock(ClusterService.class);
+        ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, Set.of(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL));
+        when(mockClusterService.getClusterSettings()).thenReturn(clusterSettings);
+        threadPool = new TestThreadPool(getTestName());
+        instance = new UpdateTimeSeriesRangeService(Settings.EMPTY, threadPool, mockClusterService);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        instance.doClose();
+        terminate(threadPool);
+    }
+
+    public void testUpdateTimeSeriesTemporalRange() {
+        String dataStreamName = "logs-app1";
+        Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
+        Instant start = now.minus(2, ChronoUnit.HOURS);
+        Instant end = now.plus(3, ChronoUnit.HOURS);
+        Metadata metadata = DataStreamTestHelper.getClusterStateWithDataStream(
+            dataStreamName,
+            List.of(new Tuple<>(start.minus(4, ChronoUnit.HOURS), start), new Tuple<>(start, end))
+        ).getMetadata();
+
+        // noop, because current end_time isn't passed now + look_a_head_time + poll_interval
+        ClusterState in = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build();
+        ClusterState result = instance.updateTimeSeriesTemporalRange(in, now);
+        assertThat(result, sameInstance(in));
+        Instant previousStartTime1 = getStartTime(result, dataStreamName, 0);
+        Instant previousEndTime1 = getEndTime(result, dataStreamName, 0);
+        Instant previousStartTime2 = getStartTime(result, dataStreamName, 1);
+        Instant previousEndTime2 = getEndTime(result, dataStreamName, 1);
+
+        // updates end time of most recent backing index only, because current time is passed current end_time + look_a_head_time and
+        // poll_interval
+        now = now.plus(1, ChronoUnit.HOURS);
+        in = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build();
+        result = instance.updateTimeSeriesTemporalRange(in, now);
+        assertThat(result, not(sameInstance(in)));
+        assertThat(getStartTime(result, dataStreamName, 0), equalTo(previousStartTime1));
+        assertThat(getEndTime(result, dataStreamName, 0), equalTo(previousEndTime1));
+        assertThat(getStartTime(result, dataStreamName, 1), equalTo(previousStartTime2));
+        assertThat(getEndTime(result, dataStreamName, 1), not(equalTo(previousEndTime2)));
+        assertThat(getEndTime(result, dataStreamName, 1), equalTo(now.plus(2, ChronoUnit.HOURS).plus(5, ChronoUnit.MINUTES)));
+    }
+
+    public void testUpdateTimeSeriesTemporalRange_customLookAHeadTime() {
+        int lookAHeadTimeMinutes = randomIntBetween(30, 180);
+        TemporalAmount lookAHeadTime = Duration.ofMinutes(lookAHeadTimeMinutes);
+        int timeSeriesPollIntervalMinutes = randomIntBetween(1, 10);
+        TemporalAmount timeSeriesPollInterval = Duration.ofMinutes(timeSeriesPollIntervalMinutes);
+        instance.setPollInterval(TimeValue.timeValueMinutes(timeSeriesPollIntervalMinutes));
+
+        String dataStreamName = "logs-app1";
+        Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
+        Instant start = now.minus(2, ChronoUnit.HOURS);
+        Instant end = now.plus(1, ChronoUnit.HOURS);
+        Metadata metadata = DataStreamTestHelper.getClusterStateWithDataStream(
+            dataStreamName,
+            List.of(new Tuple<>(start.minus(4, ChronoUnit.HOURS), start), new Tuple<>(start, end))
+        ).getMetadata();
+        metadata = Metadata.builder(metadata)
+            .updateSettings(Settings.builder().put(IndexSettings.LOOK_AHEAD_TIME.getKey(), lookAHeadTimeMinutes + "m").build())
+            .build();
+
+        var in = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build();
+        Instant previousStartTime1 = getStartTime(in, dataStreamName, 0);
+        Instant previousEndTime1 = getEndTime(in, dataStreamName, 0);
+        Instant previousStartTime2 = getStartTime(in, dataStreamName, 1);
+        Instant previousEndTime2 = getEndTime(in, dataStreamName, 1);
+
+        now = now.plus(1, ChronoUnit.HOURS);
+        var result = instance.updateTimeSeriesTemporalRange(in, now);
+        assertThat(result, not(sameInstance(in)));
+        assertThat(getStartTime(result, dataStreamName, 0), equalTo(previousStartTime1));
+        assertThat(getEndTime(result, dataStreamName, 0), equalTo(previousEndTime1));
+        assertThat(getStartTime(result, dataStreamName, 1), equalTo(previousStartTime2));
+        assertThat(getEndTime(result, dataStreamName, 1), equalTo(now.plus(lookAHeadTime).plus(timeSeriesPollInterval)));
+    }
+
+    public void testUpdateTimeSeriesTemporalRange_NoUpdateBecauseReplicated() {
+        String dataStreamName = "logs-app1";
+        Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
+        Instant start = now.minus(2, ChronoUnit.HOURS);
+        Instant end = now.plus(3, ChronoUnit.HOURS);
+        Metadata metadata = DataStreamTestHelper.getClusterStateWithDataStream(
+            dataStreamName,
+            List.of(new Tuple<>(start.minus(4, ChronoUnit.HOURS), start), new Tuple<>(start, end))
+        ).getMetadata();
+        DataStream d = metadata.dataStreams().get(dataStreamName);
+        metadata = Metadata.builder(metadata)
+            .put(
+                new DataStream(
+                    d.getName(),
+                    d.getTimeStampField(),
+                    d.getIndices(),
+                    d.getGeneration(),
+                    d.getMetadata(),
+                    d.isHidden(),
+                    true,
+                    d.isSystem(),
+                    d.isAllowCustomRouting(),
+                    d.getIndexMode()
+                )
+            )
+            .build();
+
+        now = now.plus(1, ChronoUnit.HOURS);
+        ClusterState in = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build();
+        ClusterState result = instance.updateTimeSeriesTemporalRange(in, now);
+        assertThat(result, sameInstance(in));
+    }
+
+    public void testUpdateTimeSeriesTemporalRange_NoUpdateBecauseRegularDataStream() {
+        String dataStreamName = "logs-app1";
+        Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
+        Metadata metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>(dataStreamName, 2)), List.of())
+            .getMetadata();
+
+        now = now.plus(1, ChronoUnit.HOURS);
+        ClusterState in = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build();
+        ClusterState result = instance.updateTimeSeriesTemporalRange(in, now);
+        assertThat(result, sameInstance(in));
+    }
+
+    public void testUpdateTimeSeriesTemporalRangeMultipleDataStream() {
+        String dataStreamName1 = "logs-app1";
+        String dataStreamName2 = "logs-app2";
+        String dataStreamName3 = "logs-app3";
+        Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
+
+        Instant start = now.minus(6, ChronoUnit.HOURS);
+        Metadata.Builder mbBuilder = new Metadata.Builder();
+        for (String dataStreamName : List.of(dataStreamName1, dataStreamName2, dataStreamName3)) {
+            Instant end = start.plus(2, ChronoUnit.HOURS);
+            DataStreamTestHelper.getClusterStateWithDataStream(mbBuilder, dataStreamName, List.of(new Tuple<>(start, end)));
+            start = end;
+        }
+
+        now = now.minus(3, ChronoUnit.HOURS);
+        ClusterState before = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(mbBuilder).build();
+        ClusterState result = instance.updateTimeSeriesTemporalRange(before, now);
+        assertThat(result, not(sameInstance(before)));
+        assertThat(getEndTime(result, dataStreamName1, 0), equalTo(now.plus(2, ChronoUnit.HOURS).plus(5, ChronoUnit.MINUTES)));
+        assertThat(getEndTime(result, dataStreamName2, 0), equalTo(now.plus(2, ChronoUnit.HOURS).plus(5, ChronoUnit.MINUTES)));
+        assertThat(getEndTime(result, dataStreamName3, 0), equalTo(start));
+    }
+
+    public void testUpdatePollInterval() {
+        instance.scheduleTask();
+        assertThat(instance.pollInterval, equalTo(TimeValue.timeValueMinutes(5)));
+        assertThat(instance.job.toString(), containsString("5m"));
+        instance.setPollInterval(TimeValue.timeValueMinutes(1));
+        assertThat(instance.pollInterval, equalTo(TimeValue.timeValueMinutes(1)));
+        assertThat(instance.job.toString(), containsString("1m"));
+    }
+
+    public void testUpdatePollIntervalUnscheduled() {
+        assertThat(instance.pollInterval, equalTo(TimeValue.timeValueMinutes(5)));
+        assertThat(instance.job, nullValue());
+        instance.setPollInterval(TimeValue.timeValueMinutes(1));
+        assertThat(instance.pollInterval, equalTo(TimeValue.timeValueMinutes(1)));
+        assertThat(instance.job, nullValue());
+    }
+
+    static Instant getEndTime(ClusterState state, String dataStreamName, int index) {
+        DataStream dataStream = state.getMetadata().dataStreams().get(dataStreamName);
+        Settings indexSettings = state.getMetadata().index(dataStream.getIndices().get(index)).getSettings();
+        return IndexSettings.TIME_SERIES_END_TIME.get(indexSettings);
+    }
+
+    static Instant getStartTime(ClusterState state, String dataStreamName, int index) {
+        DataStream dataStream = state.getMetadata().dataStreams().get(dataStreamName);
+        Settings indexSettings = state.getMetadata().index(dataStream.getIndices().get(index)).getSettings();
+        return IndexSettings.TIME_SERIES_START_TIME.get(indexSettings);
+    }
+
+}

+ 3 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/template/post/TransportSimulateIndexTemplateAction.java

@@ -39,6 +39,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -208,6 +209,7 @@ public class TransportSimulateIndexTemplateAction extends TransportMasterNodeRea
             .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
 
         // First apply settings sourced from index settings providers
+        final var now = Instant.now();
         Settings.Builder additionalSettings = Settings.builder();
         for (var provider : indexSettingProviders) {
             Settings result = provider.getAdditionalIndexSettings(
@@ -215,7 +217,7 @@ public class TransportSimulateIndexTemplateAction extends TransportMasterNodeRea
                 template.getDataStreamTemplate() != null ? indexName : null,
                 template.getDataStreamTemplate() != null ? template.getDataStreamTemplate().getIndexMode() : null,
                 simulatedState.getMetadata(),
-                System.currentTimeMillis(),
+                now,
                 templateSettings
             );
             dummySettings.put(result);

+ 8 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

@@ -1529,7 +1529,14 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
                 if (indexMetadata == null) {
                     throw new IndexNotFoundException(index);
                 }
-                put(IndexMetadata.builder(indexMetadata).settings(Settings.builder().put(indexMetadata.getSettings()).put(settings)));
+                // Updating version is required when updating settings.
+                // Otherwise, settings changes may not be replicated to remote clusters.
+                long newVersion = indexMetadata.getSettingsVersion() + 1;
+                put(
+                    IndexMetadata.builder(indexMetadata)
+                        .settings(Settings.builder().put(indexMetadata.getSettings()).put(settings))
+                        .settingsVersion(newVersion)
+                );
             }
             return this;
         }

+ 2 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

@@ -854,6 +854,7 @@ public class MetadataCreateIndexService {
 
             // Loop through all the explicit index setting providers, adding them to the
             // additionalIndexSettings map
+            final var resolvedAt = Instant.ofEpochMilli(request.getNameResolvedAt());
             for (IndexSettingProvider provider : indexSettingProviders) {
                 additionalIndexSettings.put(
                     provider.getAdditionalIndexSettings(
@@ -861,7 +862,7 @@ public class MetadataCreateIndexService {
                         request.dataStreamName(),
                         matchingIndexMode,
                         currentState.getMetadata(),
-                        request.getNameResolvedAt(),
+                        resolvedAt,
                         templateAndRequestSettings
                     )
                 );

+ 3 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

@@ -51,6 +51,7 @@ import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -626,6 +627,7 @@ public class MetadataIndexTemplateService {
         // but when validating templates that create data streams the MetadataCreateDataStreamService isn't used.
         var finalTemplate = Optional.ofNullable(indexTemplate.template());
         var finalSettings = Settings.builder();
+        final var now = Instant.now();
 
         // First apply settings sourced from index setting providers:
         for (var provider : indexSettingProviders) {
@@ -635,7 +637,7 @@ public class MetadataIndexTemplateService {
                     indexTemplate.getDataStreamTemplate() != null ? "validate-data-stream-name" : null,
                     indexTemplate.getDataStreamTemplate() != null ? indexTemplate.getDataStreamTemplate().getIndexMode() : null,
                     currentState.getMetadata(),
-                    System.currentTimeMillis(),
+                    now,
                     finalTemplate.map(Template::settings).orElse(Settings.EMPTY)
                 )
             );

+ 2 - 1
server/src/main/java/org/elasticsearch/cluster/routing/allocation/DataTier.java

@@ -23,6 +23,7 @@ import org.elasticsearch.index.IndexModule;
 import org.elasticsearch.index.IndexSettingProvider;
 import org.elasticsearch.snapshots.SearchableSnapshotsSettings;
 
+import java.time.Instant;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -237,7 +238,7 @@ public class DataTier {
             String dataStreamName,
             IndexMode templateIndexMode,
             Metadata metadata,
-            long resolvedAt,
+            Instant resolvedAt,
             Settings allSettings
         ) {
             Set<String> settings = allSettings.keySet();

+ 3 - 1
server/src/main/java/org/elasticsearch/index/IndexSettingProvider.java

@@ -11,6 +11,8 @@ package org.elasticsearch.index;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.settings.Settings;
 
+import java.time.Instant;
+
 /**
  * An {@link IndexSettingProvider} is a provider for index level settings that can be set
  * explicitly as a default value (so they show up as "set" for newly created indices)
@@ -32,7 +34,7 @@ public interface IndexSettingProvider {
         String dataStreamName,
         IndexMode templateIndexMode,
         Metadata metadata,
-        long resolvedAt,
+        Instant resolvedAt,
         Settings allSettings
     );
 }

+ 8 - 2
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -324,7 +324,15 @@ public final class DataStreamTestHelper {
 
     public static ClusterState getClusterStateWithDataStream(String dataStream, List<Tuple<Instant, Instant>> timeSlices) {
         Metadata.Builder builder = Metadata.builder();
+        getClusterStateWithDataStream(builder, dataStream, timeSlices);
+        return ClusterState.builder(new ClusterName("_name")).metadata(builder).build();
+    }
 
+    public static void getClusterStateWithDataStream(
+        Metadata.Builder builder,
+        String dataStream,
+        List<Tuple<Instant, Instant>> timeSlices
+    ) {
         List<IndexMetadata> backingIndices = new ArrayList<>();
         int generation = 1;
         for (Tuple<Instant, Instant> tuple : timeSlices) {
@@ -354,8 +362,6 @@ public final class DataStreamTestHelper {
             IndexMode.TIME_SERIES
         );
         builder.put(ds);
-
-        return ClusterState.builder(new ClusterName("_name")).metadata(builder).build();
     }
 
     private static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {