Browse Source

Initial data stream lifecycle support for downsampling (#98609)

This adds data stream lifecycle service implementation support
for downsampling.
Time series backing indices for a data stream with a lifecycle
that configures downsampling will be marked as read-only,
downsampled, removed from the data stream, replaced with the
corresponding downsample index, and deleted.

Multiple rounds can be configured for a data stream, and the
latest matching round will be the first one to be executed.
If one downsampling operation is in progress, we wait until it's
finished and then we start the next downsampling operation.
Note that in this scenario a data stream could have the following
backing indices:
```
[.ds-metrics-2023.08.22-000002, downsample-10s-.ds-metrics-2023.08.22-000001]
```

If this data stream has multiple rounds of downsampling configured,
the first generation index will subsequently be downsampled again
(and again).
Andrei Dan 2 years ago
parent
commit
b11d552f95
16 changed files with 2350 additions and 69 deletions
  1. 2 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java
  2. 526 66
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java
  3. 5 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/PutDataStreamLifecycleAction.java
  4. 102 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/downsampling/ReplaceBackingWithDownsampleIndexExecutor.java
  5. 232 0
      modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/downsampling/ReplaceSourceWithDownsampleIndexTask.java
  6. 1 1
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java
  7. 257 1
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java
  8. 104 0
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/downsampling/ReplaceBackingWithDownsampleIndexExecutorTests.java
  9. 271 0
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/downsampling/ReplaceSourceWithDownsampleIndexTaskTests.java
  10. 20 0
      server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequest.java
  11. 38 0
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
  12. 165 0
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java
  13. 189 0
      x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java
  14. 246 0
      x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java
  15. 191 0
      x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDriver.java
  16. 1 1
      x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java

+ 2 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

@@ -104,6 +104,8 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin {
         Setting.Property.IndexScope,
         Setting.Property.Dynamic
     );
+    public static final String LIFECYCLE_CUSTOM_INDEX_METADATA_KEY = "data_stream_lifecycle";
+
     // The dependency of index.look_ahead_time is a cluster setting and currently there is no clean validation approach for this:
     private final SetOnce<UpdateTimeSeriesRangeService> updateTimeSeriesRangeService = new SetOnce<>();
     private final SetOnce<DataStreamLifecycleErrorStore> errorStoreInitialisationService = new SetOnce<>();

+ 526 - 66
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

@@ -12,15 +12,20 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ResultDeduplicator;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
+import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
+import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse;
 import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import org.elasticsearch.action.downsample.DownsampleAction;
+import org.elasticsearch.action.downsample.DownsampleConfig;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.internal.Client;
@@ -29,6 +34,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.SimpleBatchedExecutor;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -46,6 +52,8 @@ import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.datastreams.lifecycle.downsampling.ReplaceBackingWithDownsampleIndexExecutor;
+import org.elasticsearch.datastreams.lifecycle.downsampling.ReplaceSourceWithDownsampleIndexTask;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
@@ -56,6 +64,7 @@ import org.elasticsearch.transport.TransportRequest;
 
 import java.io.Closeable;
 import java.time.Clock;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -63,10 +72,19 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.STARTED;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.SUCCESS;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS;
+import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY;
+import static org.elasticsearch.datastreams.lifecycle.downsampling.ReplaceSourceWithDownsampleIndexTask.REPLACEMENT_SOURCE_INDEX;
+
 /**
  * This service will implement the needed actions (e.g. rollover, retention) to manage the data streams with a data stream lifecycle
  * configured. It runs on the master node and it schedules a job according to the configured
@@ -100,6 +118,7 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
         Setting.Property.Dynamic,
         Setting.Property.NodeScope
     );
+    public static final String DOWNSAMPLED_INDEX_PREFIX = "downsample-";
 
     private static final Logger logger = LogManager.getLogger(DataStreamLifecycleService.class);
     /**
@@ -109,13 +128,13 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
     /*
      * This is the key for data stream lifecycle related custom index metadata.
      */
-    static final String LIFECYCLE_CUSTOM_INDEX_METADATA_KEY = "data_stream_lifecycle";
     static final String FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY = "force_merge_completed_timestamp";
     private final Settings settings;
     private final Client client;
     private final ClusterService clusterService;
     private final ThreadPool threadPool;
     final ResultDeduplicator<TransportRequest, Void> transportActionsDeduplicator;
+    final ResultDeduplicator<ClusterStateTaskListener, Void> clusterStateChangesDeduplicator;
     private final LongSupplier nowSupplier;
     private final Clock clock;
     private final DataStreamLifecycleErrorStore errorStore;
@@ -125,6 +144,7 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
     private SchedulerEngine.Job scheduledJob;
     private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
     private final MasterServiceTaskQueue<UpdateForceMergeCompleteTask> forceMergeClusterStateUpdateTaskQueue;
+    private final MasterServiceTaskQueue<ReplaceSourceWithDownsampleIndexTask> swapSourceWithDownsampleIndexQueue;
     private volatile ByteSizeValue targetMergePolicyFloorSegment;
     private volatile int targetMergePolicyFactor;
 
@@ -157,6 +177,7 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
         this.clock = clock;
         this.threadPool = threadPool;
         this.transportActionsDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
+        this.clusterStateChangesDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
         this.nowSupplier = nowSupplier;
         this.errorStore = errorStore;
         this.scheduledJob = null;
@@ -170,6 +191,11 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
             Priority.LOW,
             FORCE_MERGE_STATE_UPDATE_TASK_EXECUTOR
         );
+        this.swapSourceWithDownsampleIndexQueue = clusterService.createTaskQueue(
+            "data-stream-lifecycle-swap-source-with-downsample",
+            Priority.NORMAL,
+            new ReplaceBackingWithDownsampleIndexExecutor(client)
+        );
     }
 
     /**
@@ -240,6 +266,8 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
      */
     // default visibility for testing purposes
     void run(ClusterState state) {
+        int affectedIndices = 0;
+        int affectedDataStreams = 0;
         for (DataStream dataStream : state.metadata().dataStreams().values()) {
             clearErrorStoreForUnmanagedIndices(dataStream);
             if (dataStream.getLifecycle() == null) {
@@ -285,21 +313,17 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
                 );
             }
 
+            // the following indices should not be considered for the remainder of this service run:
+            // 1) the write index as it's still getting writes and we'll have to roll it over when the conditions are met
+            // 2) we exclude any indices that we're in the process of deleting because they'll be gone soon anyway
+            Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();
+            indicesToExcludeForRemainingRun.add(currentRunWriteIndex);
+            indicesToExcludeForRemainingRun.addAll(indicesBeingRemoved);
+
             try {
-                /*
-                 * When considering indices for force merge, we want to exclude several indices: (1) We exclude the current write index
-                 * because obviously it is still likely to get writes, (2) we exclude the most recent previous write index because since
-                 * we just switched over it might still be getting some writes, and (3) we exclude any indices that we're in the process
-                 * of deleting because they'll be gone soon anyway.
-                 */
-                Set<Index> indicesToExclude = new HashSet<>();
-                indicesToExclude.add(currentRunWriteIndex);
-                indicesToExclude.addAll(indicesBeingRemoved);
-                List<Index> potentialForceMergeIndices = dataStream.getIndices()
-                    .stream()
-                    .filter(index -> indicesToExclude.contains(index) == false)
-                    .toList();
-                maybeExecuteForceMerge(state, dataStream, potentialForceMergeIndices);
+                indicesToExcludeForRemainingRun.addAll(
+                    maybeExecuteForceMerge(state, getTargetIndices(dataStream, indicesToExcludeForRemainingRun, state.metadata()::index))
+                );
             } catch (Exception e) {
                 logger.error(
                     () -> String.format(
@@ -310,7 +334,322 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
                     e
                 );
             }
+
+            try {
+                indicesToExcludeForRemainingRun.addAll(
+                    maybeExecuteDownsampling(
+                        state,
+                        dataStream,
+                        getTargetIndices(dataStream, indicesToExcludeForRemainingRun, state.metadata()::index)
+                    )
+                );
+            } catch (Exception e) {
+                logger.error(
+                    () -> String.format(
+                        Locale.ROOT,
+                        "Data stream lifecycle failed to execute downsampling for data stream [%s]",
+                        dataStream.getName()
+                    ),
+                    e
+                );
+            }
+
+            affectedIndices += indicesToExcludeForRemainingRun.size();
+            affectedDataStreams++;
+        }
+        logger.trace(
+            "Data stream lifecycle service performed operations on [{}] indices, part of [{}] data streams",
+            affectedIndices,
+            affectedDataStreams
+        );
+    }
+
+    /**
+     * Data stream lifecycle supports configuring multiple rounds of downsampling for each managed index. When attempting to execute
+     * downsampling we iterate through the ordered rounds of downsampling that match an index (ordered ascending according to the `after`
+     * configuration) and try to figure out:
+     * - if we started downsampling for an earlier round and is in progress, in which case we need to wait for it to complete
+     * - if we started downsampling for an earlier round and it's finished but the downsampling index is not part of the data stream, in
+     * which case we need to replace the backing index with the downsampling index and delete the backing index
+     * - if we don't have any early rounds started or to add to the data stream, start downsampling the last matching round
+     *
+     * Note that the first time an index has a matching downsampling round we first mark it as read-only.
+     *
+     * Returns a set of indices that now have in-flight operations triggered by downsampling (it could be marking them as read-only,
+     * replacing an index in the data stream, deleting a source index, or downsampling itself) so these indices can be skipped in case
+     * there are other operations to be executed by the data stream lifecycle after downsampling.
+     */
+    Set<Index> maybeExecuteDownsampling(ClusterState state, DataStream dataStream, List<Index> targetIndices) {
+        Set<Index> affectedIndices = new HashSet<>();
+        Metadata metadata = state.metadata();
+        for (Index index : targetIndices) {
+            IndexMetadata backingIndexMeta = metadata.index(index);
+            assert backingIndexMeta != null : "the data stream backing indices must exist";
+            List<DataStreamLifecycle.Downsampling.Round> downsamplingRounds = dataStream.getDownsamplingRoundsFor(
+                index,
+                metadata::index,
+                nowSupplier
+            );
+            if (downsamplingRounds.isEmpty()) {
+                continue;
+            }
+
+            String indexName = index.getName();
+            IndexMetadata.DownsampleTaskStatus backingIndexDownsamplingStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndexMeta.getSettings());
+            String backingIndexDownsamplingSource = IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME.get(backingIndexMeta.getSettings());
+
+            // if the current index is not a downsample we want to mark the index as read-only before proceeding with downsampling
+            if (org.elasticsearch.common.Strings.hasText(backingIndexDownsamplingSource) == false
+                && state.blocks().indexBlocked(ClusterBlockLevel.WRITE, indexName) == false) {
+                affectedIndices.add(index);
+                addIndexBlockOnce(indexName);
+            } else if (org.elasticsearch.common.Strings.hasText(backingIndexDownsamplingSource)
+                && backingIndexDownsamplingStatus.equals(SUCCESS)) {
+                    // if the backing index is a downsample index itself, let's check if its source index still exists as we must delete it
+                    Map<String, String> lifecycleMetadata = backingIndexMeta.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY);
+
+                    // TODO document that we don't handle downsample indices that were added to the data stream manually (because we
+                    // TODO currently can't reliably identify the source index to delete when multiple rounds of donwsampling are
+                    // TODO involved unless DSL stores the needed metadata in the index metadata)
+                    if (lifecycleMetadata != null && lifecycleMetadata.containsKey(REPLACEMENT_SOURCE_INDEX)) {
+                        String actualDownsamplingSource = lifecycleMetadata.get(REPLACEMENT_SOURCE_INDEX);
+                        IndexMetadata downsampleSourceIndex = metadata.index(actualDownsamplingSource);
+                        if (downsampleSourceIndex != null) {
+                            // we mark the backing index as affected as we don't want subsequent operations that might change its state to
+                            // be performed, as we might lose the way to identify that we must delete its replacement source index
+                            affectedIndices.add(index);
+                            // delete downsampling source index (that's not part of the data stream anymore) before doing any more
+                            // downsampling
+                            deleteIndexOnce(backingIndexDownsamplingSource, "replacement with its downsampled index in the data stream");
+                        }
+                    } else {
+                        logger.trace(
+                            "Data stream lifecycle encountered managed index [{}] as part of data stream [{}] which was "
+                                + "downsampled from source [{} ]. This index was manually downsampled but data stream lifecycle service "
+                                + "only supports downsampled indices through the data stream lifecycle. This index will be ignored from "
+                                + "lifecycle donwsampling",
+                            indexName,
+                            dataStream,
+                            backingIndexDownsamplingSource
+                        );
+                    }
+                }
+
+            if (affectedIndices.contains(index) == false) {
+                // we're not performing any operation for this index which means that it:
+                // - has matching downsample rounds
+                // - is read-only
+                // So let's wait for an in-progress downsampling operation to succeed or trigger the last matching round
+                affectedIndices.addAll(waitForInProgressOrTriggerDownsampling(dataStream, backingIndexMeta, downsamplingRounds, metadata));
+            }
+        }
+
+        return affectedIndices;
+    }
+
+    /**
+     * Iterate over the matching downsampling rounds for the backing index (if any) and either wait for an early round to complete,
+     * add an early completed downsampling round to the data stream, or otherwise trigger the last matching downsampling round.
+     *
+     * Returns the indices for which we triggered an action/operation.
+     */
+    private Set<Index> waitForInProgressOrTriggerDownsampling(
+        DataStream dataStream,
+        IndexMetadata backingIndex,
+        List<DataStreamLifecycle.Downsampling.Round> downsamplingRounds,
+        Metadata metadata
+    ) {
+        assert dataStream.getIndices().contains(backingIndex.getIndex())
+            : "the provided backing index must be part of data stream:" + dataStream.getName();
+        assert downsamplingRounds.isEmpty() == false : "the index should be managed and have matching downsampling rounds";
+        Set<Index> affectedIndices = new HashSet<>();
+        DataStreamLifecycle.Downsampling.Round lastRound = downsamplingRounds.get(downsamplingRounds.size() - 1);
+
+        Index index = backingIndex.getIndex();
+        String indexName = index.getName();
+        for (DataStreamLifecycle.Downsampling.Round round : downsamplingRounds) {
+            // the downsample index name for each round is deterministic
+            String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName(
+                DOWNSAMPLED_INDEX_PREFIX,
+                backingIndex,
+                round.config().getFixedInterval()
+            );
+            IndexMetadata targetDownsampleIndexMeta = metadata.index(downsampleIndexName);
+            boolean targetDownsampleIndexExists = targetDownsampleIndexMeta != null;
+
+            if (targetDownsampleIndexExists) {
+                Set<Index> downsamplingNotComplete = evaluateDownsampleStatus(
+                    dataStream,
+                    INDEX_DOWNSAMPLE_STATUS.get(targetDownsampleIndexMeta.getSettings()),
+                    round,
+                    lastRound,
+                    index,
+                    targetDownsampleIndexMeta.getIndex()
+                );
+                if (downsamplingNotComplete.isEmpty() == false) {
+                    affectedIndices.addAll(downsamplingNotComplete);
+                    break;
+                }
+            } else {
+                if (round.equals(lastRound)) {
+                    // no maintenance needed for previously started downsampling actions and we are on the last matching round so it's time
+                    // to kick off downsampling
+                    affectedIndices.add(index);
+                    downsampleIndexOnce(round, indexName, downsampleIndexName);
+                }
+            }
         }
+        return affectedIndices;
+    }
+
+    /**
+     * Issues a request downsample the source index to the downsample index for the specified round.
+     */
+    private void downsampleIndexOnce(DataStreamLifecycle.Downsampling.Round round, String sourceIndex, String downsampleIndexName) {
+        DownsampleAction.Request request = new DownsampleAction.Request(sourceIndex, downsampleIndexName, null, round.config());
+        transportActionsDeduplicator.executeOnce(
+            request,
+            new ErrorRecordingActionListener(sourceIndex, errorStore),
+            (req, reqListener) -> downsampleIndex(request, reqListener)
+        );
+    }
+
+    /**
+     * Checks the status of the downsampling operations for the provided backing index and its corresponding downsample index.
+     * Depending on the status, we'll either error (if it's UNKNOWN and we've reached the last round), wait for it to complete (if it's
+     * STARTED), or replace the backing index with the downsample index in the data stream (if the status is SUCCESS).
+     */
+    private Set<Index> evaluateDownsampleStatus(
+        DataStream dataStream,
+        IndexMetadata.DownsampleTaskStatus downsampleStatus,
+        DataStreamLifecycle.Downsampling.Round currentRound,
+        DataStreamLifecycle.Downsampling.Round lastRound,
+        Index backingIndex,
+        Index downsampleIndex
+    ) {
+        Set<Index> affectedIndices = new HashSet<>();
+        String indexName = backingIndex.getName();
+        String downsampleIndexName = downsampleIndex.getName();
+        return switch (downsampleStatus) {
+            case UNKNOWN -> {
+                if (currentRound.equals(lastRound)) {
+                    // target downsampling index exists and is not a downsampling index (name clash?)
+                    // we fail now but perhaps we should just randomise the name?
+                    String previousError = errorStore.getError(indexName);
+
+                    errorStore.recordError(indexName, new ResourceAlreadyExistsException(downsampleIndexName));
+                    // To avoid spamming our logs, we only want to log the error once.
+                    if (previousError == null || previousError.equals(errorStore.getError(indexName)) == false) {
+                        logger.error(
+                            "Data stream lifecycle service is unable to downsample backing index [{}] for data stream [{}] and "
+                                + "donwsampling round [{}] because the target downsample index [{}] already exists",
+                            indexName,
+                            dataStream.getName(),
+                            currentRound,
+                            downsampleIndexName
+                        );
+                    }
+                }
+                yield affectedIndices;
+            }
+            case STARTED -> {
+                // we'll wait for this round to complete
+                // TODO add support for cancelling a current in-progress operation if another, later, round matches
+                logger.trace(
+                    "Data stream lifecycle service waits for index [{}] to be downsampled. Current status is [{}] and the "
+                        + "downsample index name is [{}]",
+                    indexName,
+                    STARTED,
+                    downsampleIndexName
+                );
+                // this request here might seem weird, but hear me out:
+                // if we triggered a downsample operation, and then had a master failover (so DSL starts from scratch)
+                // we can't really find out if the downsampling persistent task failed (if it was successful, no worries, the next case
+                // SUCCESS branch will catch it and we will cruise forward)
+                // if the downsampling persistent task failed, we will find out only via re-issuing the downsample request (and we will
+                // continue to re-issue the request until we get SUCCESS)
+
+                // NOTE that the downsample request is made through the deduplicator so it will only really be executed if
+                // there isn't one already in-flight. This can happen if a previous request timed-out, failed, or there was a
+                // master failover and data stream lifecycle needed to restart
+                downsampleIndexOnce(currentRound, indexName, downsampleIndexName);
+                affectedIndices.add(backingIndex);
+                yield affectedIndices;
+            }
+            case SUCCESS -> {
+                if (dataStream.getIndices().contains(downsampleIndex) == false) {
+                    // at this point the source index is part of the data stream and the downsample index is complete but not
+                    // part of the data stream. we need to replace the source index with the downsample index in the data stream
+                    affectedIndices.add(backingIndex);
+                    replaceBackingIndexWithDownsampleIndexOnce(dataStream, indexName, downsampleIndexName);
+                }
+                yield affectedIndices;
+            }
+        };
+    }
+
+    /**
+     * Issues a request to replace the backing index with the downsample index through the cluster state changes deduplicator.
+     */
+    private void replaceBackingIndexWithDownsampleIndexOnce(DataStream dataStream, String backingIndexName, String downsampleIndexName) {
+        clusterStateChangesDeduplicator.executeOnce(
+            new ReplaceSourceWithDownsampleIndexTask(dataStream.getName(), backingIndexName, downsampleIndexName, null),
+            new ErrorRecordingActionListener(backingIndexName, errorStore),
+            (req, reqListener) -> {
+                logger.trace(
+                    "Data stream lifecycle issues request to replace index [{}] with index [{}] in data stream [{}]",
+                    backingIndexName,
+                    downsampleIndexName,
+                    dataStream
+                );
+                swapSourceWithDownsampleIndexQueue.submitTask(
+                    "data-stream-lifecycle-replace-source[" + backingIndexName + "]-with-[" + downsampleIndexName + "]",
+                    new ReplaceSourceWithDownsampleIndexTask(dataStream.getName(), backingIndexName, downsampleIndexName, reqListener),
+                    null
+                );
+            }
+        );
+    }
+
+    /**
+     * Issues a request to delete the provided index through the transport action deduplicator.
+     */
+    private void deleteIndexOnce(String indexName, String reason) {
+        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName).masterNodeTimeout(TimeValue.MAX_VALUE);
+        transportActionsDeduplicator.executeOnce(
+            deleteIndexRequest,
+            new ErrorRecordingActionListener(indexName, errorStore),
+            (req, reqListener) -> deleteIndex(deleteIndexRequest, reason, reqListener)
+        );
+    }
+
+    /**
+     * Issues a request to add a WRITE index block for the provided index through the transport action deduplicator.
+     */
+    private void addIndexBlockOnce(String indexName) {
+        AddIndexBlockRequest addIndexBlockRequest = new AddIndexBlockRequest(WRITE, indexName).masterNodeTimeout(TimeValue.MAX_VALUE);
+        transportActionsDeduplicator.executeOnce(
+            addIndexBlockRequest,
+            new ErrorRecordingActionListener(indexName, errorStore),
+            (req, reqListener) -> addIndexBlock(addIndexBlockRequest, reqListener)
+        );
+    }
+
+    /**
+     * Returns the data stream lifecycle managed indices that are not part of the set of indices to exclude.
+     */
+    private static List<Index> getTargetIndices(
+        DataStream dataStream,
+        Set<Index> indicesToExcludeForRemainingRun,
+        Function<String, IndexMetadata> indexMetadataSupplier
+    ) {
+        return dataStream.getIndices()
+            .stream()
+            .filter(
+                index -> dataStream.isIndexManagedByDataStreamLifecycle(index, indexMetadataSupplier)
+                    && indicesToExcludeForRemainingRun.contains(index) == false
+            )
+            .toList();
     }
 
     /**
@@ -367,14 +706,7 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
                 // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
                 // let's start simple and reevaluate
                 String indexName = backingIndex.getIndex().getName();
-                DeleteIndexRequest deleteRequest = new DeleteIndexRequest(indexName).masterNodeTimeout(TimeValue.MAX_VALUE);
-
-                // time to delete the index
-                transportActionsDeduplicator.executeOnce(
-                    deleteRequest,
-                    new ErrorRecordingActionListener(indexName, errorStore),
-                    (req, reqListener) -> deleteIndex(deleteRequest, retention, reqListener)
-                );
+                deleteIndexOnce(indexName, "the lapsed [" + retention + "] retention period");
             }
         }
         return indicesToBeRemoved;
@@ -384,49 +716,51 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
      * This method force merges the given indices in the datastream. It writes a timestamp in the cluster state upon completion of the
      * force merge.
      */
-    private void maybeExecuteForceMerge(ClusterState state, DataStream dataStream, List<Index> indices) {
+    private Set<Index> maybeExecuteForceMerge(ClusterState state, List<Index> indices) {
         Metadata metadata = state.metadata();
+        Set<Index> affectedIndices = new HashSet<>();
         for (Index index : indices) {
-            if (dataStream.isIndexManagedByDataStreamLifecycle(index, state.metadata()::index)) {
-                IndexMetadata backingIndex = metadata.index(index);
-                assert backingIndex != null : "the data stream backing indices must exist";
-                String indexName = index.getName();
-                boolean alreadyForceMerged = isForceMergeComplete(backingIndex);
-                if (alreadyForceMerged) {
-                    logger.trace("Already force merged {}", indexName);
-                    continue;
-                }
+            IndexMetadata backingIndex = metadata.index(index);
+            assert backingIndex != null : "the data stream backing indices must exist";
+            String indexName = index.getName();
+            boolean alreadyForceMerged = isForceMergeComplete(backingIndex);
+            if (alreadyForceMerged) {
+                logger.trace("Already force merged {}", indexName);
+                continue;
+            }
 
-                ByteSizeValue configuredFloorSegmentMerge = MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.get(
-                    backingIndex.getSettings()
+            ByteSizeValue configuredFloorSegmentMerge = MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.get(
+                backingIndex.getSettings()
+            );
+            Integer configuredMergeFactor = MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.get(backingIndex.getSettings());
+            if ((configuredFloorSegmentMerge == null || configuredFloorSegmentMerge.equals(targetMergePolicyFloorSegment) == false)
+                || (configuredMergeFactor == null || configuredMergeFactor.equals(targetMergePolicyFactor) == false)) {
+                UpdateSettingsRequest updateMergePolicySettingsRequest = new UpdateSettingsRequest();
+                updateMergePolicySettingsRequest.indices(indexName);
+                updateMergePolicySettingsRequest.settings(
+                    Settings.builder()
+                        .put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), targetMergePolicyFloorSegment)
+                        .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), targetMergePolicyFactor)
+                );
+                updateMergePolicySettingsRequest.masterNodeTimeout(TimeValue.MAX_VALUE);
+                affectedIndices.add(index);
+                transportActionsDeduplicator.executeOnce(
+                    updateMergePolicySettingsRequest,
+                    new ErrorRecordingActionListener(indexName, errorStore),
+                    (req, reqListener) -> updateIndexSetting(updateMergePolicySettingsRequest, reqListener)
+                );
+            } else {
+                affectedIndices.add(index);
+                ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName);
+                // time to force merge the index
+                transportActionsDeduplicator.executeOnce(
+                    new ForceMergeRequestWrapper(forceMergeRequest),
+                    new ErrorRecordingActionListener(indexName, errorStore),
+                    (req, reqListener) -> forceMergeIndex(forceMergeRequest, reqListener)
                 );
-                Integer configuredMergeFactor = MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.get(backingIndex.getSettings());
-                if ((configuredFloorSegmentMerge == null || configuredFloorSegmentMerge.equals(targetMergePolicyFloorSegment) == false)
-                    || (configuredMergeFactor == null || configuredMergeFactor.equals(targetMergePolicyFactor) == false)) {
-                    UpdateSettingsRequest updateMergePolicySettingsRequest = new UpdateSettingsRequest();
-                    updateMergePolicySettingsRequest.indices(indexName);
-                    updateMergePolicySettingsRequest.settings(
-                        Settings.builder()
-                            .put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), targetMergePolicyFloorSegment)
-                            .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), targetMergePolicyFactor)
-                    );
-                    updateMergePolicySettingsRequest.masterNodeTimeout(TimeValue.MAX_VALUE);
-                    transportActionsDeduplicator.executeOnce(
-                        updateMergePolicySettingsRequest,
-                        new ErrorRecordingActionListener(indexName, errorStore),
-                        (req, reqListener) -> updateIndexSetting(updateMergePolicySettingsRequest, reqListener)
-                    );
-                } else {
-                    ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName);
-                    // time to force merge the index
-                    transportActionsDeduplicator.executeOnce(
-                        new ForceMergeRequestWrapper(forceMergeRequest),
-                        new ErrorRecordingActionListener(indexName, errorStore),
-                        (req, reqListener) -> forceMergeIndex(forceMergeRequest, reqListener)
-                    );
-                }
             }
         }
+        return affectedIndices;
     }
 
     private void rolloverDataStream(String writeIndexName, RolloverRequest rolloverRequest, ActionListener<Void> listener) {
@@ -512,7 +846,97 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
         });
     }
 
-    private void deleteIndex(DeleteIndexRequest deleteIndexRequest, TimeValue retention, ActionListener<Void> listener) {
+    private void addIndexBlock(AddIndexBlockRequest addIndexBlockRequest, ActionListener<Void> listener) {
+        assert addIndexBlockRequest.indices() != null && addIndexBlockRequest.indices().length == 1
+            : "Data stream lifecycle service updates the index block for one index at a time";
+        // "saving" the index name here so we don't capture the entire request
+        String targetIndex = addIndexBlockRequest.indices()[0];
+        logger.trace(
+            "Data stream lifecycle service issues request to add block [{}] for index [{}]",
+            addIndexBlockRequest.getBlock(),
+            targetIndex
+        );
+        client.admin().indices().addBlock(addIndexBlockRequest, new ActionListener<>() {
+            @Override
+            public void onResponse(AddIndexBlockResponse addIndexBlockResponse) {
+                if (addIndexBlockResponse.isAcknowledged()) {
+                    logger.info(
+                        "Data stream lifecycle service successfully added block [{}] for index index [{}]",
+                        addIndexBlockRequest.getBlock(),
+                        targetIndex
+                    );
+                    listener.onResponse(null);
+                } else {
+                    Optional<AddIndexBlockResponse.AddBlockResult> resultForTargetIndex = addIndexBlockResponse.getIndices()
+                        .stream()
+                        .filter(blockResult -> blockResult.getIndex().getName().equals(targetIndex))
+                        .findAny();
+                    if (resultForTargetIndex.isEmpty()) {
+                        // blimey
+                        // this is weird, we don't have a result for our index, so let's treat this as a success and the next DSL run will
+                        // check if we need to retry adding the block for this index
+                        logger.trace(
+                            "Data stream lifecycle service received an unacknowledged response when attempting to add the "
+                                + "read-only block to index [{}], but the response didn't contain an explicit result for the index.",
+                            targetIndex
+                        );
+                        logger.error(
+                            "Data stream lifecycle service request to mark index [{}] as readonly was not acknowledged",
+                            targetIndex
+                        );
+                        listener.onFailure(
+                            new ElasticsearchException("request to mark index [" + targetIndex + "] as read-only was not acknowledged")
+                        );
+                    } else if (resultForTargetIndex.get().hasFailures()) {
+                        AddIndexBlockResponse.AddBlockResult blockResult = resultForTargetIndex.get();
+                        if (blockResult.getException() != null) {
+                            listener.onFailure(blockResult.getException());
+                        } else {
+                            List<AddIndexBlockResponse.AddBlockShardResult.Failure> shardFailures = new ArrayList<>(
+                                blockResult.getShards().length
+                            );
+                            for (AddIndexBlockResponse.AddBlockShardResult shard : blockResult.getShards()) {
+                                if (shard.hasFailures()) {
+                                    shardFailures.addAll(Arrays.asList(shard.getFailures()));
+                                }
+                            }
+                            assert shardFailures.isEmpty() == false
+                                : "The block response must have shard failures as the global "
+                                    + "exception is null. The block result is: "
+                                    + blockResult;
+                            String errorMessage = org.elasticsearch.common.Strings.collectionToDelimitedString(
+                                shardFailures.stream().map(org.elasticsearch.common.Strings::toString).collect(Collectors.toList()),
+                                ","
+                            );
+                            listener.onFailure(new ElasticsearchException(errorMessage));
+                        }
+                    } else {
+                        logger.error(
+                            "Data stream lifecycle service request to mark index [{}] as readonly was not acknowledged",
+                            targetIndex
+                        );
+                        listener.onFailure(
+                            new ElasticsearchException("request to mark index [" + targetIndex + "] as read-only was not acknowledged")
+                        );
+                    }
+                }
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                if (e instanceof IndexNotFoundException) {
+                    // index was already deleted, treat this as a success
+                    errorStore.clearRecordedError(targetIndex);
+                    listener.onResponse(null);
+                    return;
+                }
+
+                listener.onFailure(e);
+            }
+        });
+    }
+
+    private void deleteIndex(DeleteIndexRequest deleteIndexRequest, String reason, ActionListener<Void> listener) {
         assert deleteIndexRequest.indices() != null && deleteIndexRequest.indices().length == 1
             : "Data stream lifecycle deletes one index at a time";
         // "saving" the index name here so we don't capture the entire request
@@ -521,11 +945,15 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
         client.admin().indices().delete(deleteIndexRequest, new ActionListener<>() {
             @Override
             public void onResponse(AcknowledgedResponse acknowledgedResponse) {
-                logger.info(
-                    "Data stream lifecycle successfully deleted index [{}] due to the lapsed [{}] retention period",
-                    targetIndex,
-                    retention
-                );
+                if (acknowledgedResponse.isAcknowledged()) {
+                    logger.info("Data stream lifecycle successfully deleted index [{}] due to {}", targetIndex, reason);
+                } else {
+                    logger.trace(
+                        "The delete request for index [{}] was not acknowledged. Data stream lifecycle service will retry on the"
+                            + " next run if the index still exists",
+                        targetIndex
+                    );
+                }
                 listener.onResponse(null);
             }
 
@@ -555,6 +983,38 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
         });
     }
 
+    private void downsampleIndex(DownsampleAction.Request request, ActionListener<Void> listener) {
+        String sourceIndex = request.getSourceIndex();
+        String downsampleIndex = request.getTargetIndex();
+        logger.info("Data stream lifecycle issuing request to downsample index [{}] to index [{}]", sourceIndex, downsampleIndex);
+        client.execute(DownsampleAction.INSTANCE, request, new ActionListener<>() {
+            @Override
+            public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                assert acknowledgedResponse.isAcknowledged() : "the downsample response is always acknowledged";
+                logger.info("Data stream lifecycle successfully downsampled index [{}] to index [{}]", sourceIndex, downsampleIndex);
+                listener.onResponse(null);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                String previousError = errorStore.getError(sourceIndex);
+
+                listener.onFailure(e);
+                // To avoid spamming our logs, we only want to log the error once.
+                if (previousError == null || previousError.equals(errorStore.getError(sourceIndex)) == false) {
+                    logger.error(
+                        () -> Strings.format(
+                            "Data stream lifecycle encountered an error trying to downsample index [%s]. Data stream lifecycle will "
+                                + "attempt to downsample the index on its next run.",
+                            sourceIndex
+                        ),
+                        e
+                    );
+                }
+            }
+        });
+    }
+
     /*
      * This method executes the given force merge request. Once the request has completed successfully it writes a timestamp as custom
      * metadata in the cluster state indicating when the force merge has completed. The listener is notified after the cluster state

+ 5 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/PutDataStreamLifecycleAction.java

@@ -88,6 +88,11 @@ public class PutDataStreamLifecycleAction extends ActionType<AcknowledgedRespons
             this(names, dataRetention, null);
         }
 
+        public Request(String[] names, DataStreamLifecycle lifecycle) {
+            this.names = names;
+            this.lifecycle = lifecycle;
+        }
+
         public Request(String[] names, @Nullable TimeValue dataRetention, @Nullable Boolean enabled) {
             this.names = names;
             this.lifecycle = DataStreamLifecycle.newBuilder().dataRetention(dataRetention).enabled(enabled == null || enabled).build();

+ 102 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/downsampling/ReplaceBackingWithDownsampleIndexExecutor.java

@@ -0,0 +1,102 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.datastreams.lifecycle.downsampling;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.SimpleBatchedExecutor;
+import org.elasticsearch.core.Strings;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.snapshots.SnapshotInProgressException;
+
+/**
+ * Cluster service task (batched) executor that executes the replacement of data stream backing index with its
+ * downsampled index.
+ * After the task is executed the executor issues a delete API call for the source index however, it doesn't
+ * hold up the task listener (nb we notify the listener before we call the delete API so we don't introduce
+ * weird partial failure scenarios - if the delete API fails the
+ * {@link org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService} will retry on the next run so the source index will get
+ * deleted)
+ */
+public class ReplaceBackingWithDownsampleIndexExecutor extends SimpleBatchedExecutor<ReplaceSourceWithDownsampleIndexTask, Void> {
+    private static final Logger LOGGER = LogManager.getLogger(ReplaceSourceWithDownsampleIndexTask.class);
+    private final Client client;
+
+    public ReplaceBackingWithDownsampleIndexExecutor(Client client) {
+        this.client = client;
+    }
+
+    @Override
+    public Tuple<ClusterState, Void> executeTask(ReplaceSourceWithDownsampleIndexTask task, ClusterState clusterState) throws Exception {
+        return Tuple.tuple(task.execute(clusterState), null);
+    }
+
+    @Override
+    public void taskSucceeded(ReplaceSourceWithDownsampleIndexTask task, Void unused) {
+        LOGGER.trace(
+            "Updated cluster state and replaced index [{}] with index [{}] in data stream [{}]",
+            task.getSourceBackingIndex(),
+            task.getDownsampleIndex(),
+            task.getDataStreamName()
+        );
+        task.getListener().onResponse(null);
+
+        // chain an optimistic delete of the source index call here (if it fails it'll be retried by the data stream lifecycle loop)
+        client.admin().indices().delete(new DeleteIndexRequest(task.getSourceBackingIndex()), new ActionListener<>() {
+            @Override
+            public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                if (acknowledgedResponse.isAcknowledged()) {
+                    LOGGER.info(
+                        "Data stream lifecycle successfully deleted index [{}] due to being replaced by the downsampled index [{}] in"
+                            + " data stream [{}]",
+                        task.getSourceBackingIndex(),
+                        task.getDownsampleIndex(),
+                        task.getDataStreamName()
+                    );
+                } else {
+                    LOGGER.trace(
+                        "The delete request for index [{}] was not acknowledged. Data stream lifecycle service will retry on the"
+                            + " next run if the index still exists",
+                        task.getSourceBackingIndex()
+                    );
+                }
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                if (e instanceof IndexNotFoundException) {
+                    // index was already deleted, treat this as a success
+                    return;
+                }
+
+                if (e instanceof SnapshotInProgressException) {
+                    LOGGER.info(
+                        "Data stream lifecycle is unable to delete index [{}] because it's part of an ongoing snapshot. Retrying on "
+                            + "the next data stream lifecycle run",
+                        task.getSourceBackingIndex()
+                    );
+                } else {
+                    LOGGER.error(
+                        () -> Strings.format(
+                            "Data stream lifecycle encountered an error trying to delete index [%s]. It will retry on its next run.",
+                            task.getSourceBackingIndex()
+                        ),
+                        e
+                    );
+                }
+            }
+        });
+    }
+}

+ 232 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/downsampling/ReplaceSourceWithDownsampleIndexTask.java

@@ -0,0 +1,232 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.datastreams.lifecycle.downsampling;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.IndexSettings;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY;
+
+/**
+ * Cluster state task that replaces a source index in a data stream with its downsample index.
+ * In the process it will configure the origination date for the downsample index (so it can
+ * have a correct generation time).
+ */
+public class ReplaceSourceWithDownsampleIndexTask implements ClusterStateTaskListener {
+    private static final Logger LOGGER = LogManager.getLogger(ReplaceSourceWithDownsampleIndexTask.class);
+    public static final String REPLACEMENT_SOURCE_INDEX = "replacement_source_index";
+    private ActionListener<Void> listener;
+    private final String dataStreamName;
+    private final String sourceBackingIndex;
+    private final String downsampleIndex;
+
+    public ReplaceSourceWithDownsampleIndexTask(
+        String dataStreamName,
+        String sourceBackingIndex,
+        String downsampleIndex,
+        ActionListener<Void> listener
+    ) {
+        this.dataStreamName = dataStreamName;
+        this.sourceBackingIndex = sourceBackingIndex;
+        this.downsampleIndex = downsampleIndex;
+        this.listener = listener;
+    }
+
+    ClusterState execute(ClusterState state) {
+        LOGGER.trace(
+            "Updating cluster state to replace index [{}] with [{}] in data stream [{}]",
+            sourceBackingIndex,
+            downsampleIndex,
+            dataStreamName
+        );
+        IndexAbstraction sourceIndexAbstraction = state.metadata().getIndicesLookup().get(sourceBackingIndex);
+        IndexMetadata downsampleIndexMeta = state.metadata().index(downsampleIndex);
+        if (downsampleIndexMeta == null) {
+            // the downsample index doesn't exist anymore so nothing to replace here
+            LOGGER.trace(
+                "Received request replace index [{}] with [{}] in data stream [{}] but the replacement index [{}] doesn't exist."
+                    + "Nothing to do here.",
+                sourceBackingIndex,
+                downsampleIndex,
+                dataStreamName,
+                downsampleIndex
+            );
+            return state;
+        }
+        IndexMetadata sourceIndexMeta = state.metadata().index(sourceBackingIndex);
+        DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
+        if (sourceIndexAbstraction == null) {
+            // index was deleted in the meantime, so let's check if we can make sure the downsample index ends up in the
+            // data stream (if not already there)
+            if (dataStream != null
+                && dataStream.getIndices().stream().filter(index -> index.getName().equals(downsampleIndex)).findAny().isEmpty()) {
+                // add downsample index to data stream
+                LOGGER.trace(
+                    "unable find source index [{}] but adding index [{}] to data stream [{}]",
+                    sourceBackingIndex,
+                    downsampleIndex,
+                    dataStreamName
+                );
+                Metadata.Builder newMetaData = Metadata.builder(state.metadata())
+                    .put(dataStream.addBackingIndex(state.metadata(), downsampleIndexMeta.getIndex()));
+                return ClusterState.builder(state).metadata(newMetaData).build();
+            }
+        } else {
+            // the source index exists
+            DataStream sourceParentDataStream = sourceIndexAbstraction.getParentDataStream();
+            if (sourceParentDataStream != null) {
+                assert sourceParentDataStream.getName().equals(dataStreamName)
+                    : "the backing index must be part of the provided data "
+                        + "stream ["
+                        + dataStreamName
+                        + "] but it is instead part of data stream ["
+                        + sourceParentDataStream.getName()
+                        + "]";
+                if (sourceParentDataStream.getWriteIndex().getName().equals(sourceBackingIndex)) {
+                    String errorMessage = String.format(
+                        Locale.ROOT,
+                        "index [%s] is the write index for data stream [%s] and cannot be replaced",
+                        sourceBackingIndex,
+                        sourceParentDataStream.getName()
+                    );
+                    throw new IllegalStateException(errorMessage);
+                }
+                if (sourceIndexMeta != null) {
+                    // both indices exist, let's copy the origination date from the source index to the downsample index
+                    Metadata.Builder newMetaData = Metadata.builder(state.getMetadata());
+                    TimeValue generationLifecycleDate = dataStream.getGenerationLifecycleDate(sourceIndexMeta);
+                    assert generationLifecycleDate != null : "write index must never be downsampled, or replaced";
+                    IndexMetadata updatedDownsampleMetadata = copyDataStreamLifecycleState(
+                        sourceIndexMeta,
+                        downsampleIndexMeta,
+                        generationLifecycleDate.millis()
+                    );
+
+                    newMetaData.put(updatedDownsampleMetadata, true);
+                    // replace source with downsample
+                    newMetaData.put(dataStream.replaceBackingIndex(sourceIndexMeta.getIndex(), downsampleIndexMeta.getIndex()));
+                    return ClusterState.builder(state).metadata(newMetaData).build();
+                }
+            } else {
+                // the source index is not part of a data stream, so let's check if we can make sure the downsample index ends up in the
+                // data stream
+                if (dataStream != null
+                    && dataStream.getIndices().stream().filter(index -> index.getName().equals(downsampleIndex)).findAny().isEmpty()) {
+                    Metadata.Builder newMetaData = Metadata.builder(state.getMetadata());
+                    TimeValue generationLifecycleDate = dataStream.getGenerationLifecycleDate(sourceIndexMeta);
+                    assert generationLifecycleDate != null : "write index must never be downsampled, or replaced";
+
+                    IndexMetadata updatedDownsampleMetadata = copyDataStreamLifecycleState(
+                        sourceIndexMeta,
+                        downsampleIndexMeta,
+                        generationLifecycleDate.millis()
+                    );
+                    newMetaData.put(updatedDownsampleMetadata, true);
+                    // add downsample index to data stream
+                    newMetaData.put(dataStream.addBackingIndex(state.metadata(), downsampleIndexMeta.getIndex()));
+                    return ClusterState.builder(state).metadata(newMetaData).build();
+                }
+            }
+        }
+
+        return state;
+    }
+
+    /**
+     * Copies the data stream lifecycle state information from the source index to the destination.
+     * This ensures the destination index will have a generation time by setting the {@link IndexSettings#LIFECYCLE_ORIGINATION_DATE} and
+     * that the source index is confingured in the
+     * {@link org.elasticsearch.datastreams.DataStreamsPlugin#LIFECYCLE_CUSTOM_INDEX_METADATA_KEY} custom.
+     */
+    private static IndexMetadata copyDataStreamLifecycleState(
+        IndexMetadata source,
+        IndexMetadata dest,
+        long sourceIndexGenerationTimeMillis
+    ) {
+        IndexMetadata.Builder downsampleIndexBuilder = IndexMetadata.builder(dest);
+        Map<String, String> lifecycleCustomMetadata = source.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY);
+        Map<String, String> newCustomMetadata = new HashMap<>();
+        if (lifecycleCustomMetadata != null) {
+            newCustomMetadata.putAll(lifecycleCustomMetadata);
+        }
+        newCustomMetadata.put(REPLACEMENT_SOURCE_INDEX, source.getIndex().getName());
+        downsampleIndexBuilder.putCustom(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, newCustomMetadata);
+
+        if (IndexSettings.LIFECYCLE_ORIGINATION_DATE_SETTING.exists(dest.getSettings()) == false) {
+            downsampleIndexBuilder.settings(
+                Settings.builder()
+                    .put(dest.getSettings())
+                    .put(IndexSettings.LIFECYCLE_ORIGINATION_DATE, sourceIndexGenerationTimeMillis)
+                    .build()
+            ).settingsVersion(dest.getSettingsVersion() + 1L);
+        }
+        return downsampleIndexBuilder.build();
+    }
+
+    @Override
+    public void onFailure(Exception e) {
+        if (listener != null) {
+            listener.onFailure(e);
+        }
+    }
+
+    public String getDataStreamName() {
+        return dataStreamName;
+    }
+
+    public String getSourceBackingIndex() {
+        return sourceBackingIndex;
+    }
+
+    public String getDownsampleIndex() {
+        return downsampleIndex;
+    }
+
+    public ActionListener<Void> getListener() {
+        return listener;
+    }
+
+    public void setListener(ActionListener<Void> listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ReplaceSourceWithDownsampleIndexTask that = (ReplaceSourceWithDownsampleIndexTask) o;
+        return Objects.equals(dataStreamName, that.dataStreamName)
+            && Objects.equals(sourceBackingIndex, that.sourceBackingIndex)
+            && Objects.equals(downsampleIndex, that.downsampleIndex);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(dataStreamName, sourceBackingIndex, downsampleIndex);
+    }
+}

+ 1 - 1
modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java

@@ -46,7 +46,7 @@ import static org.junit.Assert.assertTrue;
  */
 public class DataStreamLifecycleFixtures {
 
-    static DataStream createDataStream(
+    public static DataStream createDataStream(
         Metadata.Builder builder,
         String dataStreamName,
         int backingIndicesCount,

+ 257 - 1
modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

@@ -17,21 +17,30 @@ import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
+import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
 import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
 import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
 import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
 import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import org.elasticsearch.action.downsample.DownsampleAction;
+import org.elasticsearch.action.downsample.DownsampleConfig;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlock;
+import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
+import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling;
+import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling.Round;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexGraveyard;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -44,8 +53,11 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexMode;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.MergePolicyConfig;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.EqualsHashCodeTestUtils;
 import org.elasticsearch.test.client.NoOpClient;
@@ -71,15 +83,21 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.STARTED;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.SUCCESS;
+import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY;
 import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.createDataStream;
 import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING;
 import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING;
+import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DOWNSAMPLED_INDEX_PREFIX;
 import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY;
-import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY;
 import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.ONE_HUNDRED_MB;
 import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.TARGET_MERGE_FACTOR_VALUE;
 import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
 import static org.elasticsearch.test.ClusterServiceUtils.setState;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -891,6 +909,244 @@ public class DataStreamLifecycleServiceTests extends ESTestCase {
         assertThat(((ForceMergeRequest) clientSeenRequests.get(3)).indices().length, is(1));
     }
 
+    public void testDownsampling() throws Exception {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 2;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(IndexVersion.current()).put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), ONE_HUNDRED_MB)
+                .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), TARGET_MERGE_FACTOR_VALUE)
+                .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
+                .put("index.routing_path", "@timestamp"),
+            DataStreamLifecycle.newBuilder()
+                .downsampling(
+                    new Downsampling(
+                        List.of(new Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("1s"))))
+                    )
+                )
+                .dataRetention(TimeValue.MAX_VALUE)
+                .build(),
+            now
+        );
+        builder.put(dataStream);
+
+        String nodeId = "localNode";
+        DiscoveryNodes.Builder nodesBuilder = buildNodes(nodeId);
+        // we are the master node
+        nodesBuilder.masterNodeId(nodeId);
+        ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).nodes(nodesBuilder).build();
+        setState(clusterService, state);
+        String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+        Index firstGenIndex = clusterService.state().metadata().index(firstGenIndexName).getIndex();
+        Set<Index> affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling(
+            clusterService.state(),
+            dataStream,
+            List.of(firstGenIndex)
+        );
+
+        assertThat(affectedIndices, is(Set.of(firstGenIndex)));
+        // we first mark the index as read-only
+        assertThat(clientSeenRequests.size(), is(1));
+        assertThat(clientSeenRequests.get(0), instanceOf(AddIndexBlockRequest.class));
+
+        {
+            // we do the read-only bit ourselves as it's unit-testing
+            Metadata.Builder metadataBuilder = Metadata.builder(state.metadata());
+            IndexMetadata indexMetadata = metadataBuilder.getSafe(firstGenIndex);
+            Settings updatedSettings = Settings.builder().put(indexMetadata.getSettings()).put(WRITE.settingName(), true).build();
+            metadataBuilder.put(
+                IndexMetadata.builder(indexMetadata).settings(updatedSettings).settingsVersion(indexMetadata.getSettingsVersion() + 1)
+            );
+
+            ClusterBlock indexBlock = MetadataIndexStateService.createUUIDBasedBlock(WRITE.getBlock());
+            ClusterBlocks.Builder blocks = ClusterBlocks.builder(state.blocks());
+            blocks.addIndexBlock(firstGenIndexName, indexBlock);
+
+            state = ClusterState.builder(state).blocks(blocks).metadata(metadataBuilder).build();
+            setState(clusterService, state);
+        }
+
+        // on the next run downsampling should be triggered
+        affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling(clusterService.state(), dataStream, List.of(firstGenIndex));
+        assertThat(affectedIndices, is(Set.of(firstGenIndex)));
+        assertThat(clientSeenRequests.size(), is(2));
+        assertThat(clientSeenRequests.get(1), instanceOf(DownsampleAction.Request.class));
+
+        String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName(
+            DOWNSAMPLED_INDEX_PREFIX,
+            state.metadata().index(firstGenIndex),
+            new DateHistogramInterval("1s")
+        );
+        {
+            // let's simulate the in-progress downsampling
+            IndexMetadata firstGenMetadata = state.metadata().index(firstGenIndexName);
+            Metadata.Builder metadataBuilder = Metadata.builder(state.metadata());
+
+            metadataBuilder.put(
+                IndexMetadata.builder(downsampleIndexName)
+                    .settings(
+                        Settings.builder()
+                            .put(firstGenMetadata.getSettings())
+                            .put(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_UUID.getKey(), firstGenIndex.getUUID())
+                            .put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey(), STARTED)
+                    )
+                    .numberOfReplicas(0)
+                    .numberOfShards(1)
+            );
+            state = ClusterState.builder(state).metadata(metadataBuilder).build();
+            setState(clusterService, state);
+        }
+
+        // on the next run downsampling nothing should be triggered as downsampling is in progress (i.e. the STATUS is STARTED)
+        affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling(clusterService.state(), dataStream, List.of(firstGenIndex));
+        assertThat(affectedIndices, is(Set.of(firstGenIndex)));
+        // still only 2 witnessed requests, nothing extra
+        assertThat(clientSeenRequests.size(), is(2));
+
+        {
+            // mark the downsample operation as complete
+            IndexMetadata firstGenMetadata = state.metadata().index(firstGenIndexName);
+            Metadata.Builder metadataBuilder = Metadata.builder(state.metadata());
+
+            metadataBuilder.put(
+                IndexMetadata.builder(downsampleIndexName)
+                    .settings(
+                        Settings.builder()
+                            .put(firstGenMetadata.getSettings())
+                            .put(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME_KEY, firstGenIndexName)
+                            .put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey(), SUCCESS)
+                    )
+                    .numberOfReplicas(0)
+                    .numberOfShards(1)
+            );
+            state = ClusterState.builder(state).metadata(metadataBuilder).build();
+            setState(clusterService, state);
+        }
+
+        // on this run, as downsampling is complete we expect to trigger the {@link
+        // org.elasticsearch.datastreams.lifecycle.downsampling.ReplaceSourceWithDownsampleIndexTask}
+        // cluster service task and replace the source index with the downsample index in the data stream
+        // we also expect a delete request for the source index to be witnessed
+        affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling(clusterService.state(), dataStream, List.of(firstGenIndex));
+        assertThat(affectedIndices, is(Set.of(firstGenIndex)));
+        assertBusy(() -> {
+            ClusterState newState = clusterService.state();
+            IndexAbstraction downsample = newState.metadata().getIndicesLookup().get(downsampleIndexName);
+            // the downsample index must be part of the data stream
+            assertThat(downsample.getParentDataStream(), is(notNullValue()));
+            assertThat(downsample.getParentDataStream().getName(), is(dataStreamName));
+            // the source index must not be part of the data stream
+            IndexAbstraction sourceIndexAbstraction = newState.metadata().getIndicesLookup().get(firstGenIndexName);
+            assertThat(sourceIndexAbstraction.getParentDataStream(), is(nullValue()));
+
+            // {@link ReplaceBackingWithDownsampleIndexExecutor} triggers a delete reuqest for the backing index when the cluster state
+            // is successfully updated
+            assertThat(clientSeenRequests.size(), is(3));
+            assertThat(clientSeenRequests.get(2), instanceOf(DeleteIndexRequest.class));
+        }, 30, TimeUnit.SECONDS);
+
+        // NOTE from now on we need to refresh the state and dataStream variables as the data stream lifecycle service updated the
+        // cluster state in the cluster service via {@link ReplaceBackingWithDownsampleIndexExecutor}
+        dataStream = clusterService.state().metadata().dataStreams().get(dataStreamName);
+        state = clusterService.state();
+
+        // before we remove the backing index (to "implement" the above issued delete request) let's issue another data stream service
+        // donwsampling run as the service should detect that the index has not been deleted and issue a request itself
+
+        // note that we call the downsampling with the downsampled index from now on, as IT is the one that's part of the datastream now
+        IndexMetadata downsampleMeta = clusterService.state().metadata().index(downsampleIndexName);
+        affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling(
+            clusterService.state(),
+            dataStream,
+            List.of(downsampleMeta.getIndex())
+        );
+        assertThat(affectedIndices, is(Set.of(downsampleMeta.getIndex())));
+        assertThat(clientSeenRequests.size(), is(4));
+        assertThat(clientSeenRequests.get(3), instanceOf(DeleteIndexRequest.class));
+
+        {
+            // let's remove the backing index (as delete was successful ... say)
+            Metadata.Builder metadataBuilder = Metadata.builder(state.metadata());
+            metadataBuilder.remove(firstGenIndexName);
+            state = ClusterState.builder(state).metadata(metadataBuilder).build();
+            setState(clusterService, state);
+        }
+
+        // downsample was successful for this index, nothing else to have been executed here (still 4 witnessed reuqests as before)
+        affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling(
+            clusterService.state(),
+            dataStream,
+            List.of(downsampleMeta.getIndex())
+        );
+        assertThat(affectedIndices, is(empty()));
+        assertThat(clientSeenRequests.size(), is(4));
+    }
+
+    public void testDownsamplingWhenTargetIndexNameClashYieldsException() throws Exception {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 2;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(IndexVersion.current()).put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), ONE_HUNDRED_MB)
+                .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), TARGET_MERGE_FACTOR_VALUE)
+                .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
+                .put("index.routing_path", "@timestamp"),
+            DataStreamLifecycle.newBuilder()
+                .downsampling(
+                    new Downsampling(
+                        List.of(new Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("1s"))))
+                    )
+                )
+                .dataRetention(TimeValue.MAX_VALUE)
+                .build(),
+            now
+        );
+        builder.put(dataStream);
+
+        String nodeId = "localNode";
+        DiscoveryNodes.Builder nodesBuilder = buildNodes(nodeId);
+        // we are the master node
+        nodesBuilder.masterNodeId(nodeId);
+        String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+
+        // mark the first generation as read-only already
+        IndexMetadata indexMetadata = builder.get(firstGenIndexName);
+        Settings updatedSettings = Settings.builder().put(indexMetadata.getSettings()).put(WRITE.settingName(), true).build();
+        builder.put(IndexMetadata.builder(indexMetadata).settings(updatedSettings).settingsVersion(indexMetadata.getSettingsVersion() + 1));
+
+        ClusterBlock indexBlock = MetadataIndexStateService.createUUIDBasedBlock(WRITE.getBlock());
+        ClusterBlocks.Builder blocks = ClusterBlocks.builder();
+        blocks.addIndexBlock(firstGenIndexName, indexBlock);
+        ClusterState state = ClusterState.builder(ClusterName.DEFAULT).blocks(blocks).metadata(builder).nodes(nodesBuilder).build();
+
+        // add another index to the cluster state that clashes with the expected downsample index name for the configured round
+        String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName(
+            DOWNSAMPLED_INDEX_PREFIX,
+            state.metadata().index(firstGenIndexName),
+            new DateHistogramInterval("1s")
+        );
+        Metadata.Builder newMetadata = Metadata.builder(state.metadata())
+            .put(
+                IndexMetadata.builder(downsampleIndexName).settings(settings(IndexVersion.current())).numberOfReplicas(0).numberOfShards(1)
+            );
+        state = ClusterState.builder(state).metadata(newMetadata).nodes(nodesBuilder).build();
+        setState(clusterService, state);
+
+        Index firstGenIndex = state.metadata().index(firstGenIndexName).getIndex();
+        dataStreamLifecycleService.maybeExecuteDownsampling(clusterService.state(), dataStream, List.of(firstGenIndex));
+
+        assertThat(clientSeenRequests.size(), is(0));
+        String error = dataStreamLifecycleService.getErrorStore().getError(firstGenIndexName);
+        assertThat(error, notNullValue());
+        assertThat(error, containsString("resource_already_exists_exception"));
+    }
+
     /*
      * Creates a test cluster state with the given indexName. If customDataStreamLifecycleMetadata is not null, it is added as the value
      * of the index's custom metadata named "data_stream_lifecycle".

+ 104 - 0
modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/downsampling/ReplaceBackingWithDownsampleIndexExecutorTests.java

@@ -0,0 +1,104 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.datastreams.lifecycle.downsampling;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.client.NoOpClient;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.is;
+
+public class ReplaceBackingWithDownsampleIndexExecutorTests extends ESTestCase {
+
+    public void testExecutorDeletesTheSourceIndexWhenTaskSucceeds() {
+        String dataStreamName = randomAlphaOfLengthBetween(10, 100);
+        String sourceIndex = randomAlphaOfLengthBetween(10, 100);
+        String downsampleIndex = randomAlphaOfLengthBetween(10, 100);
+
+        try (Client client = new NoOpClient(getTestName()) {
+            @Override
+            protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
+                ActionType<Response> action,
+                Request request,
+                ActionListener<Response> listener
+            ) {
+                assertThat(action.name(), is(DeleteIndexAction.NAME));
+                assertTrue(request instanceof DeleteIndexRequest);
+                DeleteIndexRequest deleteIndexRequest = (DeleteIndexRequest) request;
+                assertThat(deleteIndexRequest.indices().length, is(1));
+                assertThat(deleteIndexRequest.indices()[0], is(sourceIndex));
+            }
+        }) {
+            ReplaceBackingWithDownsampleIndexExecutor executor = new ReplaceBackingWithDownsampleIndexExecutor(client);
+
+            AtomicBoolean taskListenerCalled = new AtomicBoolean(false);
+            executor.taskSucceeded(
+                new ReplaceSourceWithDownsampleIndexTask(dataStreamName, sourceIndex, downsampleIndex, new ActionListener<Void>() {
+                    @Override
+                    public void onResponse(Void unused) {
+                        taskListenerCalled.set(true);
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        logger.error(e.getMessage(), e);
+                        fail("unexpected exception: " + e.getMessage());
+                    }
+                }),
+                null
+            );
+            assertThat(taskListenerCalled.get(), is(true));
+        }
+    }
+
+    public void testExecutorCallsTaskListenerEvenIfDeteleFails() {
+        String dataStreamName = randomAlphaOfLengthBetween(10, 100);
+        String sourceIndex = randomAlphaOfLengthBetween(10, 100);
+        String downsampleIndex = randomAlphaOfLengthBetween(10, 100);
+
+        try (Client client = new NoOpClient(getTestName()) {
+            @Override
+            protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
+                ActionType<Response> action,
+                Request request,
+                ActionListener<Response> listener
+            ) {
+                listener.onFailure(new IllegalStateException("simulating a failure to delete index " + sourceIndex));
+            }
+        }) {
+            ReplaceBackingWithDownsampleIndexExecutor executor = new ReplaceBackingWithDownsampleIndexExecutor(client);
+
+            AtomicBoolean taskListenerCalled = new AtomicBoolean(false);
+            executor.taskSucceeded(
+                new ReplaceSourceWithDownsampleIndexTask(dataStreamName, sourceIndex, downsampleIndex, new ActionListener<Void>() {
+                    @Override
+                    public void onResponse(Void unused) {
+                        taskListenerCalled.set(true);
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        logger.error(e.getMessage(), e);
+                        fail("unexpected exception: " + e.getMessage());
+                    }
+                }),
+                null
+            );
+            assertThat(taskListenerCalled.get(), is(true));
+        }
+    }
+}

+ 271 - 0
modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/downsampling/ReplaceSourceWithDownsampleIndexTaskTests.java

@@ -0,0 +1,271 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.datastreams.lifecycle.downsampling;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.IndexVersion;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
+
+import java.util.Locale;
+
+import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY;
+import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.createDataStream;
+import static org.elasticsearch.datastreams.lifecycle.downsampling.ReplaceSourceWithDownsampleIndexTask.REPLACEMENT_SOURCE_INDEX;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class ReplaceSourceWithDownsampleIndexTaskTests extends ESTestCase {
+
+    private long now;
+
+    @Before
+    public void refreshNow() {
+        now = System.currentTimeMillis();
+    }
+
+    public void testDownsampleIndexMissingIsNoOp() {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(IndexVersion.current()),
+            DataStreamLifecycle.newBuilder().dataRetention(TimeValue.MAX_VALUE).build(),
+            now
+        );
+        builder.put(dataStream);
+        ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
+
+        String firstGeneration = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+        ClusterState newState = new ReplaceSourceWithDownsampleIndexTask(
+            dataStreamName,
+            firstGeneration,
+            "downsample-1s-" + firstGeneration,
+            null
+        ).execute(previousState);
+
+        assertThat(newState, is(previousState));
+    }
+
+    public void testDownsampleIsAddedToDSEvenIfSourceDeleted() {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(IndexVersion.current()),
+            DataStreamLifecycle.newBuilder().dataRetention(TimeValue.MAX_VALUE).build(),
+            now
+        );
+        String firstGenIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+        String downsampleIndex = "downsample-1s-" + firstGenIndex;
+        IndexMetadata.Builder downsampleIndexMeta = IndexMetadata.builder(downsampleIndex)
+            .settings(settings(IndexVersion.current()))
+            .numberOfShards(1)
+            .numberOfReplicas(0);
+        builder.put(downsampleIndexMeta);
+        // let's remove the first generation index
+        dataStream = dataStream.removeBackingIndex(builder.get(firstGenIndex).getIndex());
+        // delete the first gen altogether
+        builder.remove(firstGenIndex);
+        builder.put(dataStream);
+        ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
+
+        ClusterState newState = new ReplaceSourceWithDownsampleIndexTask(dataStreamName, firstGenIndex, downsampleIndex, null).execute(
+            previousState
+        );
+
+        IndexAbstraction downsampleIndexAbstraction = newState.metadata().getIndicesLookup().get(downsampleIndex);
+        assertThat(downsampleIndexAbstraction, is(notNullValue()));
+        assertThat(downsampleIndexAbstraction.getParentDataStream(), is(notNullValue()));
+        // the downsample index is part of the data stream
+        assertThat(downsampleIndexAbstraction.getParentDataStream().getName(), is(dataStreamName));
+    }
+
+    public void testSourceIndexIsWriteIndexThrowsException() {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(IndexVersion.current()),
+            DataStreamLifecycle.newBuilder().dataRetention(TimeValue.MAX_VALUE).build(),
+            now
+        );
+        builder.put(dataStream);
+        String writeIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 3);
+        String downsampleIndex = "downsample-1s-" + writeIndex;
+        IndexMetadata.Builder downsampleIndexMeta = IndexMetadata.builder(downsampleIndex)
+            .settings(settings(IndexVersion.current()))
+            .numberOfShards(1)
+            .numberOfReplicas(0);
+        builder.put(downsampleIndexMeta);
+        ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
+
+        IllegalStateException illegalStateException = expectThrows(
+            IllegalStateException.class,
+            () -> new ReplaceSourceWithDownsampleIndexTask(dataStreamName, writeIndex, downsampleIndex, null).execute(previousState)
+        );
+
+        assertThat(
+            illegalStateException.getMessage(),
+            is("index [" + writeIndex + "] is the write index for data stream [" + dataStreamName + "] and " + "cannot be replaced")
+        );
+    }
+
+    public void testSourceIsReplacedWithDownsampleAndOriginationDateIsConfigured() {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(IndexVersion.current()),
+            DataStreamLifecycle.newBuilder().dataRetention(TimeValue.MAX_VALUE).build(),
+            now
+        );
+        String firstGenIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+        String downsampleIndex = "downsample-1s-" + firstGenIndex;
+        IndexMetadata.Builder downsampleIndexMeta = IndexMetadata.builder(downsampleIndex)
+            .settings(settings(IndexVersion.current()))
+            .numberOfShards(1)
+            .numberOfReplicas(0);
+        builder.put(downsampleIndexMeta);
+        builder.put(dataStream);
+        ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
+
+        ClusterState newState = new ReplaceSourceWithDownsampleIndexTask(dataStreamName, firstGenIndex, downsampleIndex, null).execute(
+            previousState
+        );
+
+        IndexAbstraction downsampleIndexAbstraction = newState.metadata().getIndicesLookup().get(downsampleIndex);
+        assertThat(downsampleIndexAbstraction, is(notNullValue()));
+        assertThat(downsampleIndexAbstraction.getParentDataStream(), is(notNullValue()));
+        // the downsample index is part of the data stream
+        assertThat(downsampleIndexAbstraction.getParentDataStream().getName(), is(dataStreamName));
+
+        // the source index is NOT part of the data stream
+        IndexAbstraction sourceIndexAbstraction = newState.metadata().getIndicesLookup().get(firstGenIndex);
+        assertThat(sourceIndexAbstraction, is(notNullValue()));
+        assertThat(sourceIndexAbstraction.getParentDataStream(), is(nullValue()));
+
+        // let's check the downsample index has the origination date configured to the source index rollover time
+        IndexMetadata firstGenMeta = newState.metadata().index(firstGenIndex);
+        RolloverInfo rolloverInfo = firstGenMeta.getRolloverInfos().get(dataStreamName);
+        assertThat(rolloverInfo, is(notNullValue()));
+
+        IndexMetadata downsampleMeta = newState.metadata().index(downsampleIndex);
+        assertThat(IndexSettings.LIFECYCLE_ORIGINATION_DATE_SETTING.get(downsampleMeta.getSettings()), is(rolloverInfo.getTime()));
+        // the donwsample index contains metadata to remember the index we downsampled from
+        assertThat(downsampleMeta.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), is(notNullValue()));
+        assertThat(
+            downsampleMeta.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY).get(REPLACEMENT_SOURCE_INDEX),
+            is(sourceIndexAbstraction.getName())
+        );
+    }
+
+    public void testSourceIndexIsNotPartOfDSAnymore() {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(IndexVersion.current()),
+            DataStreamLifecycle.newBuilder().dataRetention(TimeValue.MAX_VALUE).build(),
+            now
+        );
+        String firstGenIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+        String downsampleIndex = "downsample-1s-" + firstGenIndex;
+        IndexMetadata.Builder downsampleIndexMeta = IndexMetadata.builder(downsampleIndex)
+            .settings(settings(IndexVersion.current()))
+            .numberOfShards(1)
+            .numberOfReplicas(0);
+        builder.put(downsampleIndexMeta);
+        // let's remove the first generation index
+        dataStream = dataStream.removeBackingIndex(builder.get(firstGenIndex).getIndex());
+        builder.put(dataStream);
+        ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
+
+        ClusterState newState = new ReplaceSourceWithDownsampleIndexTask(dataStreamName, firstGenIndex, downsampleIndex, null).execute(
+            previousState
+        );
+
+        IndexAbstraction downsampleIndexAbstraction = newState.metadata().getIndicesLookup().get(downsampleIndex);
+        assertThat(downsampleIndexAbstraction, is(notNullValue()));
+        assertThat(downsampleIndexAbstraction.getParentDataStream(), is(notNullValue()));
+        // the downsample index is part of the data stream
+        assertThat(downsampleIndexAbstraction.getParentDataStream().getName(), is(dataStreamName));
+
+        // origination date and the lifecycle metadata is configured even if the source index is not part of the data stream anymore
+        IndexMetadata firstGenMeta = newState.metadata().index(firstGenIndex);
+        RolloverInfo rolloverInfo = firstGenMeta.getRolloverInfos().get(dataStreamName);
+        assertThat(rolloverInfo, is(notNullValue()));
+
+        IndexMetadata downsampleMeta = newState.metadata().index(downsampleIndex);
+        assertThat(IndexSettings.LIFECYCLE_ORIGINATION_DATE_SETTING.get(downsampleMeta.getSettings()), is(rolloverInfo.getTime()));
+        // the donwsample index contains metadata to remember the index we downsampled from
+        assertThat(downsampleMeta.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), is(notNullValue()));
+        assertThat(downsampleMeta.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY).get(REPLACEMENT_SOURCE_INDEX), is(firstGenIndex));
+    }
+
+    public void testListenersIsNonConsideredInEquals() {
+        // the task is used as a key in a result deduplicator ({@link ResultDeduplicator}) map and the listener must not
+        // be taken into account
+
+        String dataStreamName = randomAlphaOfLengthBetween(10, 100);
+        String sourceBackingIndex = randomAlphaOfLengthBetween(10, 100);
+        String downsampleIndex = randomAlphaOfLengthBetween(10, 100);
+        ReplaceSourceWithDownsampleIndexTask withoutListener = new ReplaceSourceWithDownsampleIndexTask(
+            dataStreamName,
+            sourceBackingIndex,
+            downsampleIndex,
+            null
+        );
+
+        ReplaceSourceWithDownsampleIndexTask withListener = new ReplaceSourceWithDownsampleIndexTask(
+            dataStreamName,
+            sourceBackingIndex,
+            downsampleIndex,
+            new ActionListener<>() {
+                @Override
+                public void onResponse(Void unused) {
+
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+
+                }
+            }
+        );
+
+        assertThat(withoutListener.equals(withListener), is(true));
+    }
+}

+ 20 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequest.java

@@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.CollectionUtils;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Objects;
 
 import static org.elasticsearch.action.ValidateActions.addValidationError;
@@ -114,4 +115,23 @@ public class AddIndexBlockRequest extends AcknowledgedRequest<AddIndexBlockReque
         indicesOptions.writeIndicesOptions(out);
         block.writeTo(out);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AddIndexBlockRequest that = (AddIndexBlockRequest) o;
+        return block == that.block && Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(block, indicesOptions);
+        result = 31 * result + Arrays.hashCode(indices);
+        return result;
+    }
 }

+ 38 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -18,6 +18,7 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.cluster.Diff;
 import org.elasticsearch.cluster.SimpleDiffable;
+import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling.Round;
 import org.elasticsearch.common.ParsingException;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -617,6 +618,43 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         return indicesPastRetention;
     }
 
+    /**
+     * Returns a list of downsampling rounds this index is eligible for (based on the rounds `after` configuration) or
+     * an empty list if this data streams' lifecycle doesn't have downsampling configured or the index's generation age
+     * doesn't yet match any `after` downsampling configuration.
+     *
+     * An empty list is returned for indices that are not time series.
+     */
+    public List<Round> getDownsamplingRoundsFor(
+        Index index,
+        Function<String, IndexMetadata> indexMetadataSupplier,
+        LongSupplier nowSupplier
+    ) {
+        assert indices.contains(index) : "the provided index must be a backing index for this datastream";
+        if (lifecycle == null || lifecycle.getDownsamplingRounds() == null) {
+            return List.of();
+        }
+
+        IndexMetadata indexMetadata = indexMetadataSupplier.apply(index.getName());
+        if (indexMetadata == null || IndexSettings.MODE.get(indexMetadata.getSettings()) != IndexMode.TIME_SERIES) {
+            return List.of();
+        }
+        TimeValue indexGenerationTime = getGenerationLifecycleDate(indexMetadata);
+
+        if (indexGenerationTime != null) {
+            long nowMillis = nowSupplier.getAsLong();
+            long indexGenerationTimeMillis = indexGenerationTime.millis();
+            List<Round> orderedRoundsForIndex = new ArrayList<>(lifecycle.getDownsamplingRounds().size());
+            for (Round round : lifecycle.getDownsamplingRounds()) {
+                if (nowMillis >= indexGenerationTimeMillis + round.after().getMillis()) {
+                    orderedRoundsForIndex.add(round);
+                }
+            }
+            return orderedRoundsForIndex;
+        }
+        return List.of();
+    }
+
     /**
      * Returns the non-write backing indices that are older than the provided age, *excluding the write index*.
      * The index age is calculated from the rollover or index creation date (or the origination date if present).

+ 165 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
 import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
 import org.elasticsearch.action.admin.indices.rollover.RolloverConfigurationTests;
 import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
+import org.elasticsearch.action.downsample.DownsampleConfig;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.UUIDs;
@@ -26,6 +27,7 @@ import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.test.AbstractXContentSerializingTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xcontent.ToXContent;
@@ -1272,6 +1274,169 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         }
     }
 
+    public void testGetDownsampleRounds() {
+        String dataStreamName = "metrics-foo";
+        long now = System.currentTimeMillis();
+
+        List<DataStreamMetadata> creationAndRolloverTimes = List.of(
+            DataStreamMetadata.dataStreamMetadata(now - 5000, now - 4000),
+            DataStreamMetadata.dataStreamMetadata(now - 4000, now - 3000),
+            DataStreamMetadata.dataStreamMetadata(now - 3000, now - 2000),
+            DataStreamMetadata.dataStreamMetadata(now - 2000, now - 1000),
+            DataStreamMetadata.dataStreamMetadata(now, null)
+        );
+
+        {
+            Metadata.Builder builder = Metadata.builder();
+            DataStream dataStream = createDataStream(
+                builder,
+                dataStreamName,
+                creationAndRolloverTimes,
+                settings(IndexVersion.current()).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
+                    .put("index.routing_path", "@timestamp"),
+                DataStreamLifecycle.newBuilder()
+                    .downsampling(
+                        new DataStreamLifecycle.Downsampling(
+                            List.of(
+                                new DataStreamLifecycle.Downsampling.Round(
+                                    TimeValue.timeValueMillis(2000),
+                                    new DownsampleConfig(new DateHistogramInterval("10s"))
+                                ),
+                                new DataStreamLifecycle.Downsampling.Round(
+                                    TimeValue.timeValueMillis(3200),
+                                    new DownsampleConfig(new DateHistogramInterval("100s"))
+                                ),
+                                new DataStreamLifecycle.Downsampling.Round(
+                                    TimeValue.timeValueMillis(3500),
+                                    new DownsampleConfig(new DateHistogramInterval("1000s"))
+                                )
+                            )
+                        )
+                    )
+                    .build()
+            );
+            Metadata metadata = builder.build();
+
+            // generation time is now - 2000
+            String thirdGeneration = DataStream.getDefaultBackingIndexName(dataStreamName, 3);
+            Index thirdIndex = metadata.index(thirdGeneration).getIndex();
+            List<DataStreamLifecycle.Downsampling.Round> roundsForThirdIndex = dataStream.getDownsamplingRoundsFor(
+                thirdIndex,
+                metadata::index,
+                () -> now
+            );
+
+            assertThat(roundsForThirdIndex.size(), is(1));
+            assertThat(roundsForThirdIndex.get(0).after(), is(TimeValue.timeValueMillis(2000)));
+
+            // generation time is now - 40000
+            String firstGeneration = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+            Index firstIndex = metadata.index(firstGeneration).getIndex();
+            List<DataStreamLifecycle.Downsampling.Round> roundsForFirstIndex = dataStream.getDownsamplingRoundsFor(
+                firstIndex,
+                metadata::index,
+                () -> now
+            );
+            // expecting all rounds to match this index
+            assertThat(roundsForFirstIndex.size(), is(3));
+            // assert the order is maintained
+            assertThat(
+                roundsForFirstIndex.stream().map(DataStreamLifecycle.Downsampling.Round::after).toList(),
+                is(List.of(TimeValue.timeValueMillis(2000), TimeValue.timeValueMillis(3200), TimeValue.timeValueMillis(3500)))
+            );
+        }
+
+        {
+            // non-timeseries indices should be skipped
+            Metadata.Builder builder = Metadata.builder();
+            DataStream dataStream = createDataStream(
+                builder,
+                dataStreamName,
+                creationAndRolloverTimes,
+                // no TSDB settings
+                settings(IndexVersion.current()),
+                DataStreamLifecycle.newBuilder()
+                    .downsampling(
+                        new DataStreamLifecycle.Downsampling(
+                            List.of(
+                                new DataStreamLifecycle.Downsampling.Round(
+                                    TimeValue.timeValueMillis(2000),
+                                    new DownsampleConfig(new DateHistogramInterval("10s"))
+                                ),
+                                new DataStreamLifecycle.Downsampling.Round(
+                                    TimeValue.timeValueMillis(3200),
+                                    new DownsampleConfig(new DateHistogramInterval("100s"))
+                                ),
+                                new DataStreamLifecycle.Downsampling.Round(
+                                    TimeValue.timeValueMillis(3500),
+                                    new DownsampleConfig(new DateHistogramInterval("1000s"))
+                                )
+                            )
+                        )
+                    )
+                    .build()
+            );
+            Metadata metadata = builder.build();
+
+            // generation time is now - 40000
+            String firstGeneration = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+            Index firstIndex = metadata.index(firstGeneration).getIndex();
+            List<DataStreamLifecycle.Downsampling.Round> roundsForFirstIndex = dataStream.getDownsamplingRoundsFor(
+                firstIndex,
+                metadata::index,
+                () -> now
+            );
+            assertThat(roundsForFirstIndex.size(), is(0));
+        }
+
+        {
+            // backing indices for data streams without lifecycle don't match any rounds
+            Metadata.Builder builder = Metadata.builder();
+            DataStream dataStream = createDataStream(
+                builder,
+                dataStreamName,
+                creationAndRolloverTimes,
+                settings(IndexVersion.current()).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
+                    .put("index.routing_path", "@timestamp"),
+                null
+            );
+            Metadata metadata = builder.build();
+
+            // generation time is now - 40000
+            String firstGeneration = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+            Index firstIndex = metadata.index(firstGeneration).getIndex();
+            List<DataStreamLifecycle.Downsampling.Round> roundsForFirstIndex = dataStream.getDownsamplingRoundsFor(
+                firstIndex,
+                metadata::index,
+                () -> now
+            );
+            assertThat(roundsForFirstIndex.size(), is(0));
+        }
+
+        {
+            // backing indices for data streams without downsampling configured don't match any rounds
+            Metadata.Builder builder = Metadata.builder();
+            DataStream dataStream = createDataStream(
+                builder,
+                dataStreamName,
+                creationAndRolloverTimes,
+                settings(IndexVersion.current()).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
+                    .put("index.routing_path", "@timestamp"),
+                DataStreamLifecycle.newBuilder().build()
+            );
+            Metadata metadata = builder.build();
+
+            String firstGeneration = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+            Index firstIndex = metadata.index(firstGeneration).getIndex();
+            List<DataStreamLifecycle.Downsampling.Round> roundsForFirstIndex = dataStream.getDownsamplingRoundsFor(
+                firstIndex,
+                metadata::index,
+                () -> now
+            );
+            assertThat(roundsForFirstIndex.size(), is(0));
+        }
+    }
+
     public void testIsIndexManagedByDataStreamLifecycle() {
         String dataStreamName = "metrics-foo";
         long now = System.currentTimeMillis();

+ 189 - 0
x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java

@@ -0,0 +1,189 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.downsample;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
+import org.elasticsearch.action.downsample.DownsampleConfig;
+import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.datastreams.DataStreamsPlugin;
+import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.InternalTestCluster;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
+import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 4)
+public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
+    private static final Logger logger = LogManager.getLogger(DataStreamLifecycleDownsampleDisruptionIT.class);
+    public static final int DOC_COUNT = 50_000;
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class);
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
+        settings.put(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL, "1s");
+        return settings.build();
+    }
+
+    @TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging")
+    public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
+        try (InternalTestCluster cluster = internalCluster()) {
+            final List<String> masterNodes = cluster.startMasterOnlyNodes(1);
+            cluster.startDataOnlyNodes(3);
+            ensureStableCluster(cluster.size());
+            ensureGreen();
+
+            final String dataStreamName = "metrics-foo";
+            DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder()
+                .downsampling(
+                    new DataStreamLifecycle.Downsampling(
+                        List.of(
+                            new DataStreamLifecycle.Downsampling.Round(
+                                TimeValue.timeValueMillis(0),
+                                new DownsampleConfig(new DateHistogramInterval("1s"))
+                            )
+                        )
+                    )
+                )
+                .build();
+            int indexedDocs = DataStreamLifecycleDriver.setupDataStreamAndIngestDocs(client(), dataStreamName, lifecycle, DOC_COUNT);
+
+            client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();
+
+            // DSL runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
+            // downsampling. This sleep here tries to get as close as possible to having disruption during the downsample execution.
+            long sleepTime = randomLongBetween(3000, 4500);
+            logger.info("-> giving data stream lifecycle [{}] millis to make some progress before starting the disruption", sleepTime);
+            Thread.sleep(sleepTime);
+            final CountDownLatch disruptionStart = new CountDownLatch(1);
+            final CountDownLatch disruptionEnd = new CountDownLatch(1);
+            List<String> backingIndices = getBackingIndices(client(), dataStreamName);
+            // first generation index
+            String sourceIndex = backingIndices.get(0);
+            new Thread(new Disruptor(cluster, sourceIndex, new DisruptionListener() {
+                @Override
+                public void disruptionStart() {
+                    disruptionStart.countDown();
+                }
+
+                @Override
+                public void disruptionEnd() {
+                    disruptionEnd.countDown();
+                }
+            }, masterNodes.get(0), (ignored) -> {
+                try {
+                    cluster.rollingRestart(new InternalTestCluster.RestartCallback() {
+                        @Override
+                        public boolean validateClusterForming() {
+                            return true;
+                        }
+                    });
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            })).start();
+
+            waitUntil(
+                () -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty(),
+                60,
+                TimeUnit.SECONDS
+            );
+            ensureStableCluster(cluster.numDataAndMasterNodes());
+
+            final String targetIndex = "downsample-1s-" + sourceIndex;
+            assertBusy(() -> {
+                try {
+                    GetSettingsResponse getSettingsResponse = client().admin()
+                        .indices()
+                        .getSettings(new GetSettingsRequest().indices(targetIndex))
+                        .actionGet();
+                    Settings indexSettings = getSettingsResponse.getIndexToSettings().get(targetIndex);
+                    assertThat(indexSettings, is(notNullValue()));
+                    assertThat(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexSettings), is(IndexMetadata.DownsampleTaskStatus.SUCCESS));
+                } catch (Exception e) {
+                    throw new AssertionError(e);
+                }
+            }, 60, TimeUnit.SECONDS);
+        }
+    }
+
+    interface DisruptionListener {
+        void disruptionStart();
+
+        void disruptionEnd();
+    }
+
+    private class Disruptor implements Runnable {
+        final InternalTestCluster cluster;
+        private final String sourceIndex;
+        private final DisruptionListener listener;
+        private final String clientNode;
+        private final Consumer<String> disruption;
+
+        private Disruptor(
+            final InternalTestCluster cluster,
+            final String sourceIndex,
+            final DisruptionListener listener,
+            final String clientNode,
+            final Consumer<String> disruption
+        ) {
+            this.cluster = cluster;
+            this.sourceIndex = sourceIndex;
+            this.listener = listener;
+            this.clientNode = clientNode;
+            this.disruption = disruption;
+        }
+
+        @Override
+        public void run() {
+            listener.disruptionStart();
+            try {
+                final String candidateNode = cluster.client(clientNode)
+                    .admin()
+                    .cluster()
+                    .prepareSearchShards(sourceIndex)
+                    .get()
+                    .getNodes()[0].getName();
+                logger.info("Candidate node [" + candidateNode + "]");
+                disruption.accept(candidateNode);
+                ensureGreen(sourceIndex);
+                ensureStableCluster(cluster.numDataAndMasterNodes(), clientNode);
+
+            } catch (Exception e) {
+                logger.error("Ignoring Error while injecting disruption [" + e.getMessage() + "]");
+            } finally {
+                listener.disruptionEnd();
+            }
+        }
+    }
+}

+ 246 - 0
x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java

@@ -0,0 +1,246 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.downsample;
+
+import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.downsample.DownsampleConfig;
+import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
+import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.datastreams.DataStreamsPlugin;
+import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
+import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamLifecycleAction;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
+import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
+import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices;
+import static org.hamcrest.Matchers.is;
+
+public class DataStreamLifecycleDownsampleIT extends ESIntegTestCase {
+    public static final int DOC_COUNT = 50_000;
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class);
+    }
+
+    protected boolean ignoreExternalCluster() {
+        return true;
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
+        settings.put(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL, "1s");
+        return settings.build();
+    }
+
+    @TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging")
+    public void testDownsampling() throws Exception {
+        String dataStreamName = "metrics-foo";
+
+        DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder()
+            .downsampling(
+                new Downsampling(
+                    List.of(
+                        new Downsampling.Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("1s"))),
+                        new Downsampling.Round(TimeValue.timeValueSeconds(10), new DownsampleConfig(new DateHistogramInterval("10s")))
+                    )
+                )
+            )
+            .build();
+
+        DataStreamLifecycleDriver.setupDataStreamAndIngestDocs(client(), dataStreamName, lifecycle, DOC_COUNT);
+
+        List<String> backingIndices = getBackingIndices(client(), dataStreamName);
+        String firstGenerationBackingIndex = backingIndices.get(0);
+        String oneSecondDownsampleIndex = "downsample-1s-" + firstGenerationBackingIndex;
+        String tenSecondsDownsampleIndex = "downsample-10s-" + firstGenerationBackingIndex;
+
+        Set<String> witnessedDownsamplingIndices = new HashSet<>();
+        clusterService().addListener(event -> {
+            if (event.indicesCreated().contains(oneSecondDownsampleIndex)
+                || event.indicesDeleted().stream().anyMatch(index -> index.getName().equals(oneSecondDownsampleIndex))) {
+                witnessedDownsamplingIndices.add(oneSecondDownsampleIndex);
+            }
+            if (event.indicesCreated().contains(tenSecondsDownsampleIndex)) {
+                witnessedDownsamplingIndices.add(tenSecondsDownsampleIndex);
+            }
+        });
+
+        client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();
+
+        assertBusy(() -> {
+            // first downsampling round
+            assertThat(witnessedDownsamplingIndices.contains(oneSecondDownsampleIndex), is(true));
+        }, 30, TimeUnit.SECONDS);
+
+        assertBusy(() -> {
+            assertThat(witnessedDownsamplingIndices.size(), is(2));
+            assertThat(witnessedDownsamplingIndices.contains(oneSecondDownsampleIndex), is(true));
+            assertThat(witnessedDownsamplingIndices.contains(tenSecondsDownsampleIndex), is(true));
+        }, 30, TimeUnit.SECONDS);
+
+        assertBusy(() -> {
+            List<String> dsBackingIndices = getBackingIndices(client(), dataStreamName);
+
+            assertThat(dsBackingIndices.size(), is(2));
+            String writeIndex = dsBackingIndices.get(1);
+            assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
+            // the last downsampling round must remain in the data stream
+            assertThat(dsBackingIndices.get(0), is(tenSecondsDownsampleIndex));
+        }, 30, TimeUnit.SECONDS);
+    }
+
+    @TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging")
+    public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception {
+        String dataStreamName = "metrics-bar";
+
+        DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder()
+            .downsampling(
+                new Downsampling(
+                    List.of(
+                        new Downsampling.Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("1s"))),
+                        // data stream lifecycle runs every 1 second, so by the time we forcemerge the backing index it would've been at
+                        // least 2 seconds since rollover. only the 10 seconds round should be executed.
+                        new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("10s")))
+                    )
+                )
+            )
+            .build();
+        DataStreamLifecycleDriver.setupDataStreamAndIngestDocs(client(), dataStreamName, lifecycle, DOC_COUNT);
+
+        List<String> backingIndices = getBackingIndices(client(), dataStreamName);
+        String firstGenerationBackingIndex = backingIndices.get(0);
+        String oneSecondDownsampleIndex = "downsample-1s-" + firstGenerationBackingIndex;
+        String tenSecondsDownsampleIndex = "downsample-10s-" + firstGenerationBackingIndex;
+
+        Set<String> witnessedDownsamplingIndices = new HashSet<>();
+        clusterService().addListener(event -> {
+            if (event.indicesCreated().contains(oneSecondDownsampleIndex)
+                || event.indicesDeleted().stream().anyMatch(index -> index.getName().equals(oneSecondDownsampleIndex))) {
+                witnessedDownsamplingIndices.add(oneSecondDownsampleIndex);
+            }
+            if (event.indicesCreated().contains(tenSecondsDownsampleIndex)) {
+                witnessedDownsamplingIndices.add(tenSecondsDownsampleIndex);
+            }
+        });
+
+        client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();
+
+        assertBusy(() -> {
+            assertThat(witnessedDownsamplingIndices.size(), is(1));
+            // only the ten seconds downsample round should've been executed
+            assertThat(witnessedDownsamplingIndices.contains(tenSecondsDownsampleIndex), is(true));
+        }, 30, TimeUnit.SECONDS);
+
+        assertBusy(() -> {
+            List<String> dsBackingIndices = getBackingIndices(client(), dataStreamName);
+
+            assertThat(dsBackingIndices.size(), is(2));
+            String writeIndex = dsBackingIndices.get(1);
+            assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
+            assertThat(dsBackingIndices.get(0), is(tenSecondsDownsampleIndex));
+        }, 30, TimeUnit.SECONDS);
+    }
+
+    @TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging")
+    public void testUpdateDownsampleRound() throws Exception {
+        // we'll test updating the data lifecycle to add an earlier downsampling round to an already executed lifecycle
+        // we expect the earlier round to be ignored
+        String dataStreamName = "metrics-baz";
+
+        DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder()
+            .downsampling(
+                new Downsampling(
+                    List.of(
+                        new Downsampling.Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("1s"))),
+                        // data stream lifecycle runs every 1 second, so by the time we forcemerge the backing index it would've been at
+                        // least 2 seconds since rollover. only the 10 seconds round should be executed.
+                        new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("10s")))
+                    )
+                )
+            )
+            .build();
+
+        DataStreamLifecycleDriver.setupDataStreamAndIngestDocs(client(), dataStreamName, lifecycle, DOC_COUNT);
+
+        List<String> backingIndices = getBackingIndices(client(), dataStreamName);
+        String firstGenerationBackingIndex = backingIndices.get(0);
+        String oneSecondDownsampleIndex = "downsample-1s-" + firstGenerationBackingIndex;
+        String tenSecondsDownsampleIndex = "downsample-10s-" + firstGenerationBackingIndex;
+
+        Set<String> witnessedDownsamplingIndices = new HashSet<>();
+        clusterService().addListener(event -> {
+            if (event.indicesCreated().contains(oneSecondDownsampleIndex)
+                || event.indicesDeleted().stream().anyMatch(index -> index.getName().equals(oneSecondDownsampleIndex))) {
+                witnessedDownsamplingIndices.add(oneSecondDownsampleIndex);
+            }
+            if (event.indicesCreated().contains(tenSecondsDownsampleIndex)) {
+                witnessedDownsamplingIndices.add(tenSecondsDownsampleIndex);
+            }
+        });
+
+        client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();
+
+        assertBusy(() -> {
+            assertThat(witnessedDownsamplingIndices.size(), is(1));
+            // only the ten seconds downsample round should've been executed
+            assertThat(witnessedDownsamplingIndices.contains(tenSecondsDownsampleIndex), is(true));
+        }, 30, TimeUnit.SECONDS);
+
+        assertBusy(() -> {
+            List<String> dsBackingIndices = getBackingIndices(client(), dataStreamName);
+            assertThat(dsBackingIndices.size(), is(2));
+            String writeIndex = dsBackingIndices.get(1);
+            assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
+            assertThat(dsBackingIndices.get(0), is(tenSecondsDownsampleIndex));
+        }, 30, TimeUnit.SECONDS);
+
+        // update the lifecycle so that it only has one round, for the same `after` parameter as before, but a different interval
+        // the different interval should yield a different downsample index name so we expect the data stream lifecycle to get the previous
+        // `10s` interval downsample index, downsample it to `30s` and replace it in the data stream instead of the `10s` one.
+        DataStreamLifecycle updatedLifecycle = DataStreamLifecycle.newBuilder()
+            .downsampling(
+                new Downsampling(
+                    List.of(new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("30s"))))
+                )
+            )
+            .build();
+
+        client().execute(
+            PutDataStreamLifecycleAction.INSTANCE,
+            new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, updatedLifecycle)
+        );
+
+        String thirtySecondsDownsampleIndex = "downsample-30s-" + firstGenerationBackingIndex;
+
+        assertBusy(() -> {
+            assertThat(indexExists(tenSecondsDownsampleIndex), is(false));
+
+            List<String> dsBackingIndices = getBackingIndices(client(), dataStreamName);
+            assertThat(dsBackingIndices.size(), is(2));
+            String writeIndex = dsBackingIndices.get(1);
+            assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
+            assertThat(dsBackingIndices.get(0), is(thirtySecondsDownsampleIndex));
+        }, 30, TimeUnit.SECONDS);
+    }
+}

+ 191 - 0
x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDriver.java

@@ -0,0 +1,191 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.downsample;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.datastreams.GetDataStreamAction;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.time.DateFormatter;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexMode;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.engine.VersionConflictEngineException;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.elasticsearch.test.ESTestCase.indexSettings;
+import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
+import static org.elasticsearch.test.ESTestCase.randomFrom;
+import static org.elasticsearch.test.ESTestCase.randomIntBetween;
+import static org.elasticsearch.test.ESTestCase.randomLongBetween;
+import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * A collection of methods to help with the setup of data stream lifecycle downsample tests.
+ */
+public class DataStreamLifecycleDriver {
+    private static final Logger logger = LogManager.getLogger(DataStreamLifecycleDriver.class);
+    private static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+    public static final String FIELD_TIMESTAMP = "@timestamp";
+    public static final String FIELD_DIMENSION_1 = "dimension_kw";
+    public static final String FIELD_DIMENSION_2 = "dimension_long";
+    public static final String FIELD_METRIC_COUNTER = "counter";
+
+    public static int setupDataStreamAndIngestDocs(Client client, String dataStreamName, DataStreamLifecycle lifecycle, int docCount)
+        throws IOException {
+        putTSDBIndexTemplate(client, dataStreamName + "*", lifecycle);
+        return indexDocuments(client, dataStreamName, docCount);
+    }
+
+    public static List<String> getBackingIndices(Client client, String dataStreamName) {
+        GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
+        GetDataStreamAction.Response getDataStreamResponse = client.execute(GetDataStreamAction.INSTANCE, getDataStreamRequest).actionGet();
+        assertThat(getDataStreamResponse.getDataStreams().isEmpty(), is(false));
+        assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), is(dataStreamName));
+        return getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().stream().map(Index::getName).toList();
+    }
+
+    private static void putTSDBIndexTemplate(Client client, String pattern, DataStreamLifecycle lifecycle) throws IOException {
+        Settings.Builder settings = indexSettings(1, 0).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
+            .putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), List.of(FIELD_DIMENSION_1));
+
+        XContentBuilder mapping = jsonBuilder().startObject().startObject("_doc").startObject("properties");
+        mapping.startObject(FIELD_TIMESTAMP).field("type", "date").endObject();
+
+        mapping.startObject(FIELD_DIMENSION_1).field("type", "keyword").field("time_series_dimension", true).endObject();
+        mapping.startObject(FIELD_DIMENSION_2).field("type", "long").field("time_series_dimension", true).endObject();
+
+        mapping.startObject(FIELD_METRIC_COUNTER)
+            .field("type", "double") /* numeric label indexed as a metric */
+            .field("time_series_metric", "counter")
+            .endObject();
+
+        mapping.endObject().endObject().endObject();
+
+        putComposableIndexTemplate(
+            client,
+            "id1",
+            CompressedXContent.fromJSON(Strings.toString(mapping)),
+            List.of(pattern),
+            settings.build(),
+            null,
+            lifecycle
+        );
+    }
+
+    private static void putComposableIndexTemplate(
+        Client client,
+        String id,
+        @Nullable CompressedXContent mappings,
+        List<String> patterns,
+        @Nullable Settings settings,
+        @Nullable Map<String, Object> metadata,
+        @Nullable DataStreamLifecycle lifecycle
+    ) throws IOException {
+        PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id);
+        request.indexTemplate(
+            new ComposableIndexTemplate(
+                patterns,
+                new Template(settings, mappings == null ? null : mappings, null, lifecycle),
+                null,
+                null,
+                null,
+                metadata,
+                new ComposableIndexTemplate.DataStreamTemplate(),
+                null
+            )
+        );
+        client.execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
+    }
+
+    private static int indexDocuments(Client client, String dataStreamName, int docCount) {
+        final Supplier<XContentBuilder> sourceSupplier = () -> {
+            final String ts = randomDateForInterval(new DateHistogramInterval("1s"), System.currentTimeMillis());
+            double counterValue = DATE_FORMATTER.parseMillis(ts);
+            final List<String> dimensionValues = new ArrayList<>(5);
+            for (int j = 0; j < randomIntBetween(1, 5); j++) {
+                dimensionValues.add(randomAlphaOfLength(6));
+            }
+            try {
+                return XContentFactory.jsonBuilder()
+                    .startObject()
+                    .field(FIELD_TIMESTAMP, ts)
+                    .field(FIELD_DIMENSION_1, randomFrom(dimensionValues))
+                    .field(FIELD_DIMENSION_2, randomIntBetween(1, 10))
+                    .field(FIELD_METRIC_COUNTER, counterValue)
+                    .endObject();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+        return bulkIndex(client, dataStreamName, sourceSupplier, docCount);
+    }
+
+    private static String randomDateForInterval(final DateHistogramInterval interval, final long startTime) {
+        long endTime = startTime + 10 * interval.estimateMillis();
+        return randomDateForRange(startTime, endTime);
+    }
+
+    private static String randomDateForRange(long start, long end) {
+        return DATE_FORMATTER.formatMillis(randomLongBetween(start, end));
+    }
+
+    private static int bulkIndex(Client client, String dataStreamName, Supplier<XContentBuilder> docSourceSupplier, int docCount) {
+        BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
+        bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+        for (int i = 0; i < docCount; i++) {
+            IndexRequest indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE);
+            XContentBuilder source = docSourceSupplier.get();
+            indexRequest.source(source);
+            bulkRequestBuilder.add(indexRequest);
+        }
+        BulkResponse bulkResponse = bulkRequestBuilder.get();
+        int duplicates = 0;
+        for (BulkItemResponse response : bulkResponse.getItems()) {
+            if (response.isFailed()) {
+                if (response.getFailure().getCause() instanceof VersionConflictEngineException) {
+                    // A duplicate event was created by random generator. We should not fail for this
+                    // reason.
+                    logger.debug("-> failed to insert a duplicate: [{}]", response.getFailureMessage());
+                    duplicates++;
+                } else {
+                    throw new ElasticsearchException("Failed to index data: " + bulkResponse.buildFailureMessage());
+                }
+            }
+        }
+        int docsIndexed = docCount - duplicates;
+        logger.info("-> Indexed [{}] documents. Dropped [{}] duplicates.", docsIndexed, duplicates);
+        return docsIndexed;
+    }
+}

+ 1 - 1
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java

@@ -131,7 +131,7 @@ public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecut
             return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task");
         }
 
-        final ShardRouting shardRouting = clusterState.routingTable().shardRoutingTable(shardId).primaryShard();
+        final ShardRouting shardRouting = indexShardRouting.primaryShard();
         if (shardRouting.started() == false) {
             return NO_NODE_FOUND;
         }