Browse Source

Add auto force merge functionality to DLM (#95204)

This adds automatic force merging for indices in data streams that are being managed by DLM.
The calls to force merge happen if an index (1) is not the current write index, (2) has not been rolled
over in this DLM run, (3) and has not been previously successfully force merged.
Keith Massey 2 years ago
parent
commit
6eebbf5267

+ 5 - 0
docs/changelog/95204.yaml

@@ -0,0 +1,5 @@
+pr: 95204
+summary: Add auto force merge functionality to DLM
+area: DLM
+type: enhancement
+issues: []

+ 134 - 0
modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/DataLifecycleServiceIT.java

@@ -7,8 +7,16 @@
  */
 package org.elasticsearch.dlm;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.flush.FlushResponse;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
+import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
+import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
+import org.elasticsearch.action.admin.indices.segments.ShardSegments;
 import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -23,6 +31,7 @@ import org.elasticsearch.cluster.metadata.DataLifecycle;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.DataStreamAction;
 import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Nullable;
@@ -30,6 +39,7 @@ import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.datastreams.DataStreamsPlugin;
 import org.elasticsearch.dlm.action.PutDataLifecycleAction;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.MergePolicyConfig;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.rest.RestStatus;
@@ -39,11 +49,13 @@ import org.elasticsearch.xcontent.XContentType;
 import org.junit.After;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
@@ -54,11 +66,13 @@ import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.startsWith;
 
 public class DataLifecycleServiceIT extends ESIntegTestCase {
+    private static final Logger logger = LogManager.getLogger(DataLifecycleServiceIT.class);
 
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins() {
@@ -246,6 +260,126 @@ public class DataLifecycleServiceIT extends ESIntegTestCase {
         });
     }
 
+    public void testAutomaticForceMerge() throws Exception {
+        /*
+         * This test makes sure that (1) DLM does _not_ call forcemerge on an index in the same DLM pass when it rolls over the index and
+         * that (2) it _does_ call forcemerge on an index that was rolled over in a previous DLM pass.
+         * It's harder than you would think to detect through the REST API that forcemerge has been called. The reason is that segment
+         * merging happens automatically during indexing, and when forcemerge is called it likely does nothing because all nececssary
+         * merging has already happened automatically. In order to force forcemerge to merge segments, we change
+         * "index.merge.policy.merge_factor" on the index to a value lower than the default. If the number of segments goes down, that is
+         *  proof that DLM called forcemerge.
+         */
+        DataLifecycle lifecycle = new DataLifecycle();
+        disableDLM();
+        String dataStreamName = "metrics-foo";
+        putComposableIndexTemplate(
+            "id1",
+            null,
+            List.of(dataStreamName + "*"),
+            Settings.builder().put("index.number_of_replicas", 1).put("index.number_of_shards", 1).build(),
+            null,
+            lifecycle
+        );
+
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+        client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+        int finalGeneration = randomIntBetween(2, 10);
+        for (int currentGeneration = 1; currentGeneration < finalGeneration; currentGeneration++) {
+            // This is currently the write index, but it will be rolled over as soon as DLM runs:
+            final String toBeRolledOverIndex = DataStream.getDefaultBackingIndexName(dataStreamName, currentGeneration);
+            for (int i = 0; i < randomIntBetween(10, 50); i++) {
+                indexDocs(dataStreamName, randomIntBetween(1, 300));
+                // Make sure the segments get written:
+                FlushResponse flushResponse = client().admin().indices().flush(new FlushRequest(toBeRolledOverIndex)).actionGet();
+                assertThat(flushResponse.getStatus(), equalTo(RestStatus.OK));
+            }
+            /*
+             * Without the following, calls to forcemerge are essentially a no-op since it has already done automatic merging. Setting
+             * merge_factor on its own does not do anything, but it results in calls to forcemerge making observable changes to the
+             * number of segments. So we're doing this just so that we can check that DLM did actually call forcemerge.
+             */
+            updateIndexSettings(
+                Settings.builder().put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), 5),
+                toBeRolledOverIndex
+            );
+
+            final String toBeForceMergedIndex;
+            final int preDlmSegmentsForceMergedIndex;
+            if (currentGeneration == 1) {
+                toBeForceMergedIndex = null; // Not going to be used
+                preDlmSegmentsForceMergedIndex = -1; // Not going to be used
+            } else {
+                toBeForceMergedIndex = DataStream.getDefaultBackingIndexName(dataStreamName, currentGeneration - 1);
+                preDlmSegmentsForceMergedIndex = getSegmentCount(toBeForceMergedIndex);
+            }
+            final int preDlmSegmentsAboutToBeRolledOverIndex = getSegmentCount(toBeRolledOverIndex);
+            int currentBackingIndexCount = currentGeneration;
+            DataLifecycleService dataLifecycleService = internalCluster().getInstance(
+                DataLifecycleService.class,
+                internalCluster().getMasterName()
+            );
+            ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
+            // run DLM once
+            dataLifecycleService.run(clusterService.state());
+            assertBusy(() -> {
+                GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
+                GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
+                    .actionGet();
+                assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
+                DataStream dataStream = getDataStreamResponse.getDataStreams().get(0).getDataStream();
+                assertThat(dataStream.getName(), equalTo(dataStreamName));
+                List<Index> backingIndices = dataStream.getIndices();
+                assertThat(backingIndices.size(), equalTo(currentBackingIndexCount + 1));
+                String writeIndex = dataStream.getWriteIndex().getName();
+                assertThat(writeIndex, backingIndexEqualTo(dataStreamName, currentBackingIndexCount + 1));
+                int postDlmSegmentsNewlyRolledOverIndex = getSegmentCount(toBeRolledOverIndex);
+                /*
+                 * We only expect forcemerge to happen on the 2nd DLM run and later, since on the first there's only the single write
+                 * index to be rolled over.
+                 */
+                if (currentBackingIndexCount > 1) {
+                    int postDlmSegmentsForceMergedIndex = getSegmentCount(toBeForceMergedIndex);
+                    assertThat(
+                        "The segments for " + toBeForceMergedIndex + " were not merged",
+                        postDlmSegmentsForceMergedIndex,
+                        lessThan(preDlmSegmentsForceMergedIndex)
+                    );
+                }
+                // We want to assert that when DLM rolls over the write index it, it doesn't forcemerge it on that iteration:
+                assertThat(
+                    "The segments for " + toBeRolledOverIndex + " were unexpectedly merged",
+                    postDlmSegmentsNewlyRolledOverIndex,
+                    equalTo(preDlmSegmentsAboutToBeRolledOverIndex)
+                );
+            });
+        }
+    }
+
+    private static void enableDLM(TimeValue pollInterval) {
+        updateClusterSettings(Settings.builder().put(DataLifecycleService.DLM_POLL_INTERVAL, pollInterval));
+    }
+
+    private static void disableDLM() {
+        updateClusterSettings(Settings.builder().put(DataLifecycleService.DLM_POLL_INTERVAL, TimeValue.MAX_VALUE));
+    }
+
+    private int getSegmentCount(String indexName) throws ExecutionException, InterruptedException {
+        IndicesSegmentResponse segmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest(indexName)).get();
+        return (int) segmentResponse.getIndices()
+            .get(indexName)
+            .getShards()
+            .values()
+            .stream()
+            .map(IndexShardSegments::shards)
+            .flatMap(Arrays::stream)
+            .filter(shard -> shard.getShardRouting().primary())
+            .map(ShardSegments::getSegments)
+            .flatMap(List::stream)
+            .filter(segment -> segment.search) // in case there hasn't been a flush
+            .count();
+    }
+
     public void testErrorRecordingOnRollover() throws Exception {
         // empty lifecycle contains the default rollover
         DataLifecycle lifecycle = new DataLifecycle();

+ 9 - 1
modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/ExplainDataLifecycleIT.java

@@ -225,7 +225,15 @@ public class ExplainDataLifecycleIT extends ESIntegTestCase {
             ExplainDataLifecycleAction.Response response = client().execute(ExplainDataLifecycleAction.INSTANCE, explainIndicesRequest)
                 .actionGet();
             assertThat(response.getIndices().size(), is(1));
-            assertThat(response.getIndices().get(0).getError(), is(nullValue()));
+            if (internalCluster().numDataNodes() > 1) {
+                assertThat(response.getIndices().get(0).getError(), is(nullValue()));
+            } else {
+                /*
+                 * If there is only one node in the cluster then the replica shard will never be allocated. So forcemerge will never
+                 * succeed, and there will always be an error in the error store. This behavior is subject to change in the future.
+                 */
+                assertThat(response.getIndices().get(0).getError(), is(notNullValue()));
+            }
         });
     }
 

+ 289 - 7
modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleService.java

@@ -11,22 +11,30 @@ package org.elasticsearch.dlm;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ResultDeduplicator;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
+import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
 import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
+import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
+import org.elasticsearch.cluster.SimpleBatchedExecutor;
 import org.elasticsearch.cluster.metadata.DataLifecycle;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.scheduler.SchedulerEngine;
 import org.elasticsearch.common.scheduler.TimeValueSchedule;
@@ -35,6 +43,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
@@ -44,10 +53,16 @@ import org.elasticsearch.transport.TransportRequest;
 
 import java.io.Closeable;
 import java.time.Clock;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
 
 /**
  * This service will implement the needed actions (e.g. rollover, retention) to manage the data streams with a DLM lifecycle configured.
@@ -63,16 +78,23 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
         Setting.Property.Dynamic,
         Setting.Property.NodeScope
     );
+
     private static final Logger logger = LogManager.getLogger(DataLifecycleService.class);
     /**
      * Name constant for the job DLM schedules
      */
     private static final String DATA_LIFECYCLE_JOB_NAME = "dlm";
+    /*
+     * This is the key for DLM-related custom index metadata.
+     */
+    static final String DLM_CUSTOM_INDEX_METADATA_KEY = "dlm";
+    static final String FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY = "force_merge_completed_timestamp";
 
     private final Settings settings;
     private final Client client;
     private final ClusterService clusterService;
-    private final ResultDeduplicator<TransportRequest, Void> transportActionsDeduplicator;
+    private final ThreadPool threadPool;
+    final ResultDeduplicator<TransportRequest, Void> transportActionsDeduplicator;
     private final LongSupplier nowSupplier;
     private final Clock clock;
     private final DataLifecycleErrorStore errorStore;
@@ -81,6 +103,21 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
     private volatile RolloverConfiguration rolloverConfiguration;
     private SchedulerEngine.Job scheduledJob;
     private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
+    private final MasterServiceTaskQueue<UpdateForceMergeCompleteTask> forceMergeClusterStateUpdateTaskQueue;
+
+    private static final SimpleBatchedExecutor<UpdateForceMergeCompleteTask, Void> FORCE_MERGE_STATE_UPDATE_TASK_EXECUTOR =
+        new SimpleBatchedExecutor<>() {
+            @Override
+            public Tuple<ClusterState, Void> executeTask(UpdateForceMergeCompleteTask task, ClusterState clusterState) throws Exception {
+                return Tuple.tuple(task.execute(clusterState), null);
+            }
+
+            @Override
+            public void taskSucceeded(UpdateForceMergeCompleteTask task, Void unused) {
+                logger.trace("Updated cluster state for force merge of index [{}]", task.targetIndex);
+                task.listener.onResponse(null);
+            }
+        };
 
     public DataLifecycleService(
         Settings settings,
@@ -95,12 +132,18 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
         this.client = client;
         this.clusterService = clusterService;
         this.clock = clock;
+        this.threadPool = threadPool;
         this.transportActionsDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
         this.nowSupplier = nowSupplier;
         this.errorStore = errorStore;
         this.scheduledJob = null;
         this.pollInterval = DLM_POLL_INTERVAL_SETTING.get(settings);
         this.rolloverConfiguration = clusterService.getClusterSettings().get(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING);
+        this.forceMergeClusterStateUpdateTaskQueue = clusterService.createTaskQueue(
+            "dlm-forcemerge-state-update",
+            Priority.LOW,
+            FORCE_MERGE_STATE_UPDATE_TASK_EXECUTOR
+        );
     }
 
     /**
@@ -167,24 +210,30 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
                 continue;
             }
 
-            String writeIndex = dataStream.getWriteIndex().getName();
+            /*
+             * This is the pre-rollover write index. It may or may not be the write index after maybeExecuteRollover has executed, depending
+             * on rollover criteria. We're keeping a reference to it because regardless of whether it's rolled over or not we want to
+             * exclude it from force merging later in this DLM run.
+             */
+            Index originalWriteIndex = dataStream.getWriteIndex();
             try {
                 maybeExecuteRollover(state, dataStream);
             } catch (Exception e) {
                 logger.error(() -> String.format(Locale.ROOT, "DLM failed to rollver data stream [%s]", dataStream.getName()), e);
                 DataStream latestDataStream = clusterService.state().metadata().dataStreams().get(dataStream.getName());
                 if (latestDataStream != null) {
-                    if (latestDataStream.getWriteIndex().getName().equals(writeIndex)) {
+                    if (latestDataStream.getWriteIndex().getName().equals(originalWriteIndex.getName())) {
                         // data stream has not been rolled over in the meantime so record the error against the write index we
                         // attempted the rollover
-                        errorStore.recordError(writeIndex, e);
+                        errorStore.recordError(originalWriteIndex.getName(), e);
                     }
                 }
             }
-
+            Set<Index> indicesBeingRemoved;
             try {
-                maybeExecuteRetention(state, dataStream);
+                indicesBeingRemoved = maybeExecuteRetention(state, dataStream);
             } catch (Exception e) {
+                indicesBeingRemoved = Set.of();
                 // individual index errors would be reported via the API action listener for every delete call
                 // we could potentially record errors at a data stream level and expose it via the _data_stream API?
                 logger.error(
@@ -192,6 +241,30 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
                     e
                 );
             }
+
+            try {
+                /*
+                 * When considering indices for force merge, we want to exclude several indices: (1) We exclude the current write index
+                 * because obviously it is still likely to get writes, (2) we exclude the most recent previous write index because since
+                 * we just switched over it might still be getting some writes, and (3) we exclude any indices that we're in the process
+                 * of deleting because they'll be gone soon anyway.
+                 */
+                Set<Index> indicesToExclude = new HashSet<>();
+                Index currentWriteIndex = dataStream.getWriteIndex();
+                indicesToExclude.add(currentWriteIndex);
+                indicesToExclude.add(originalWriteIndex); // Could be the same as currentWriteIndex, but that's fine
+                indicesToExclude.addAll(indicesBeingRemoved);
+                List<Index> potentialForceMergeIndices = dataStream.getIndices()
+                    .stream()
+                    .filter(index -> indicesToExclude.contains(index) == false)
+                    .toList();
+                maybeExecuteForceMerge(state, dataStream, potentialForceMergeIndices);
+            } catch (Exception e) {
+                logger.error(
+                    () -> String.format(Locale.ROOT, "DLM failed to execute force merge for data stream [%s]", dataStream.getName()),
+                    e
+                );
+            }
         }
     }
 
@@ -227,13 +300,22 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
         }
     }
 
-    private void maybeExecuteRetention(ClusterState state, DataStream dataStream) {
+    /**
+     * This method sends requests to delete any indices in the datastream that exceed its retention policy. It returns the set of indices
+     * it has sent delete requests for.
+     * @param state The cluster state from which to get index metadata
+     * @param dataStream The datastream
+     * @return The set of indices that delete requests have been sent for
+     */
+    private Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream) {
         TimeValue retention = getRetentionConfiguration(dataStream);
+        Set<Index> indicesToBeRemoved = new HashSet<>();
         if (retention != null) {
             Metadata metadata = state.metadata();
             List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier);
 
             for (Index index : backingIndicesOlderThanRetention) {
+                indicesToBeRemoved.add(index);
                 IndexMetadata backingIndex = metadata.index(index);
                 assert backingIndex != null : "the data stream backing indices must exist";
 
@@ -250,6 +332,34 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
                 );
             }
         }
+        return indicesToBeRemoved;
+    }
+
+    /*
+     * This method force merges the given indices in the datastream. It writes a timestamp in the cluster state upon completion of the
+     * force merge.
+     */
+    private void maybeExecuteForceMerge(ClusterState state, DataStream dataStream, List<Index> indices) {
+        Metadata metadata = state.metadata();
+        for (Index index : indices) {
+            if (dataStream.isIndexManagedByDLM(index, state.metadata()::index)) {
+                IndexMetadata backingIndex = metadata.index(index);
+                assert backingIndex != null : "the data stream backing indices must exist";
+                String indexName = index.getName();
+                boolean alreadyForceMerged = isForceMergeComplete(backingIndex);
+                if (alreadyForceMerged) {
+                    logger.trace("Already force merged {}", indexName);
+                    continue;
+                }
+                ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName);
+                // time to force merge the index
+                transportActionsDeduplicator.executeOnce(
+                    new ForceMergeRequestWrapper(forceMergeRequest),
+                    new ErrorRecordingActionListener(indexName, errorStore),
+                    (req, reqListener) -> forceMergeIndex(forceMergeRequest, reqListener)
+                );
+            }
+        }
     }
 
     private void rolloverDataStream(String writeIndexName, RolloverRequest rolloverRequest, ActionListener<Void> listener) {
@@ -331,6 +441,85 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
         });
     }
 
+    /*
+     * This method executes the given force merge request. Once the request has completed successfully it writes a timestamp as custom
+     * metadata in the cluster state indicating when the force merge has completed. The listener is notified after the cluster state
+     * update has been made, or when the forcemerge fails or the write of the to the cluster state fails.
+     */
+    private void forceMergeIndex(ForceMergeRequest forceMergeRequest, ActionListener<Void> listener) {
+        assert forceMergeRequest.indices() != null && forceMergeRequest.indices().length == 1 : "DLM force merges one index at a time";
+        final String targetIndex = forceMergeRequest.indices()[0];
+        logger.info("DLM is issuing a request to force merge index [{}]", targetIndex);
+        client.admin().indices().forceMerge(forceMergeRequest, new ActionListener<>() {
+            @Override
+            public void onResponse(ForceMergeResponse forceMergeResponse) {
+                if (forceMergeResponse.getFailedShards() > 0) {
+                    DefaultShardOperationFailedException[] failures = forceMergeResponse.getShardFailures();
+                    String message = Strings.format(
+                        "DLM failed to forcemerge %d shards for index [%s] due to failures [%s]",
+                        forceMergeResponse.getFailedShards(),
+                        targetIndex,
+                        failures == null
+                            ? "unknown"
+                            : Arrays.stream(failures).map(DefaultShardOperationFailedException::toString).collect(Collectors.joining(","))
+                    );
+                    onFailure(new ElasticsearchException(message));
+                } else if (forceMergeResponse.getTotalShards() != forceMergeResponse.getSuccessfulShards()) {
+                    String message = Strings.format(
+                        "Force merge request only had %d successful shards out of a total of %d",
+                        forceMergeResponse.getSuccessfulShards(),
+                        forceMergeResponse.getTotalShards()
+                    );
+                    onFailure(new ElasticsearchException(message));
+                } else {
+                    logger.info("DLM successfully force merged index [{}]", targetIndex);
+                    setForceMergeCompletedTimestamp(targetIndex, listener);
+                }
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                String previousError = errorStore.getError(targetIndex);
+                /*
+                 * Note that this call to onFailure has to happen before the logging because we choose whether to log or not based on a
+                 * side effect of the onFailure call (it updates the error in the errorStore).
+                 */
+                listener.onFailure(e);
+                // To avoid spamming our logs, we only want to log the error once.
+                if (previousError == null || previousError.equals(errorStore.getError(targetIndex)) == false) {
+                    logger.warn(
+                        () -> Strings.format(
+                            "DLM encountered an error trying to force merge index [%s]. DLM will attempt to force merge the index on its "
+                                + "next run.",
+                            targetIndex
+                        ),
+                        e
+                    );
+                }
+            }
+        });
+    }
+
+    /*
+     * This method sets the value of the custom index metadata field "force_merge_completed_timestamp" within the field "dlm" to value. The
+     * method returns immediately, but the update happens asynchronously and listener is notified on success or failure.
+     */
+    private void setForceMergeCompletedTimestamp(String targetIndex, ActionListener<Void> listener) {
+        forceMergeClusterStateUpdateTaskQueue.submitTask(
+            Strings.format("Adding force merge complete marker to cluster state for [%s]", targetIndex),
+            new UpdateForceMergeCompleteTask(listener, targetIndex, threadPool),
+            null
+        );
+    }
+
+    /*
+     * Returns true if a value has been set for the custom index metadata field "force_merge_completed_timestamp" within the field "dlm".
+     */
+    private boolean isForceMergeComplete(IndexMetadata backingIndex) {
+        Map<String, String> customMetadata = backingIndex.getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY);
+        return customMetadata != null && customMetadata.containsKey(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY);
+    }
+
     @Nullable
     static TimeValue getRetentionConfiguration(DataStream dataStream) {
         if (dataStream.getLifecycle() == null) {
@@ -420,4 +609,97 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
     DataLifecycleErrorStore getErrorStore() {
         return errorStore;
     }
+
+    /**
+     * This is a ClusterStateTaskListener that writes the force_merge_completed_timestamp into the cluster state. It is meant to run in
+     * STATE_UPDATE_TASK_EXECUTOR.
+     */
+    static class UpdateForceMergeCompleteTask implements ClusterStateTaskListener {
+        private final ActionListener<Void> listener;
+        private final String targetIndex;
+        private final ThreadPool threadPool;
+
+        UpdateForceMergeCompleteTask(ActionListener<Void> listener, String targetIndex, ThreadPool threadPool) {
+            this.listener = listener;
+            this.targetIndex = targetIndex;
+            this.threadPool = threadPool;
+        }
+
+        ClusterState execute(ClusterState currentState) throws Exception {
+            logger.debug("Updating cluster state with force merge complete marker for {}", targetIndex);
+            IndexMetadata indexMetadata = currentState.metadata().index(targetIndex);
+            Map<String, String> customMetadata = indexMetadata.getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY);
+            Map<String, String> newCustomMetadata = new HashMap<>();
+            if (customMetadata != null) {
+                newCustomMetadata.putAll(customMetadata);
+            }
+            newCustomMetadata.put(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY, Long.toString(threadPool.absoluteTimeInMillis()));
+            IndexMetadata updatededIndexMetadata = new IndexMetadata.Builder(indexMetadata).putCustom(
+                DLM_CUSTOM_INDEX_METADATA_KEY,
+                newCustomMetadata
+            ).build();
+            Metadata metadata = Metadata.builder(currentState.metadata()).put(updatededIndexMetadata, true).build();
+            return ClusterState.builder(currentState).metadata(metadata).build();
+        }
+
+        @Override
+        public void onFailure(Exception e) {
+            listener.onFailure(e);
+        }
+    }
+
+    /**
+     * This wrapper exists only to provide equals and hashCode implementations of a ForceMergeRequest for transportActionsDeduplicator.
+     * It intentionally ignores forceMergeUUID (which ForceMergeRequest's equals/hashCode would have to if they existed) because we don't
+     * care about it for DLM deduplication. This class is non-private for the sake of unit testing, but should not be used outside of
+     * DataLifecycleService.
+     */
+    static final class ForceMergeRequestWrapper extends ForceMergeRequest {
+        ForceMergeRequestWrapper(ForceMergeRequest original) {
+            super(original.indices());
+            this.maxNumSegments(original.maxNumSegments());
+            this.onlyExpungeDeletes(original.onlyExpungeDeletes());
+            this.flush(original.flush());
+            this.indicesOptions(original.indicesOptions());
+            this.setShouldStoreResult(original.getShouldStoreResult());
+            this.setRequestId(original.getRequestId());
+            this.timeout(original.timeout());
+            this.setParentTask(original.getParentTask());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ForceMergeRequest that = (ForceMergeRequest) o;
+            return Arrays.equals(indices, that.indices())
+                && maxNumSegments() == that.maxNumSegments()
+                && onlyExpungeDeletes() == that.onlyExpungeDeletes()
+                && flush() == that.flush()
+                && Objects.equals(indicesOptions(), that.indicesOptions())
+                && getShouldStoreResult() == that.getShouldStoreResult()
+                && getRequestId() == that.getRequestId()
+                && Objects.equals(timeout(), that.timeout())
+                && Objects.equals(getParentTask(), that.getParentTask());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(
+                Arrays.hashCode(indices),
+                maxNumSegments(),
+                onlyExpungeDeletes(),
+                flush(),
+                indicesOptions(),
+                getShouldStoreResult(),
+                getRequestId(),
+                timeout(),
+                getParentTask()
+            );
+        }
+    }
 }

+ 501 - 4
modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleServiceTests.java

@@ -8,15 +8,22 @@
 
 package org.elasticsearch.dlm;
 
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
+import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
+import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
 import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
 import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
+import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.support.DefaultShardOperationFailedException;
+import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.DataLifecycle;
@@ -27,6 +34,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.ClusterSettings;
@@ -36,6 +44,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.EqualsHashCodeTestUtils;
 import org.elasticsearch.test.client.NoOpClient;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -48,18 +57,27 @@ import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.elasticsearch.dlm.DLMFixtures.createDataStream;
+import static org.elasticsearch.dlm.DataLifecycleService.DLM_CUSTOM_INDEX_METADATA_KEY;
+import static org.elasticsearch.dlm.DataLifecycleService.FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY;
 import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
 import static org.elasticsearch.test.ClusterServiceUtils.setState;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
@@ -69,7 +87,8 @@ public class DataLifecycleServiceTests extends ESTestCase {
     private ThreadPool threadPool;
     private DataLifecycleService dataLifecycleService;
     private List<TransportRequest> clientSeenRequests;
-    private NoOpClient client;
+    private Client client;
+    private DoExecuteDelegate clientDelegate;
     private ClusterService clusterService;
 
     @Before
@@ -94,6 +113,7 @@ public class DataLifecycleServiceTests extends ESTestCase {
             () -> now,
             new DataLifecycleErrorStore()
         );
+        clientDelegate = null;
         dataLifecycleService.init();
     }
 
@@ -155,7 +175,7 @@ public class DataLifecycleServiceTests extends ESTestCase {
 
         ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
         dataLifecycleService.run(state);
-        assertThat(clientSeenRequests.size(), is(1));
+        assertThat(clientSeenRequests.size(), is(3));  // rollover the write index, and force merge the other two
         assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
     }
 
@@ -175,7 +195,7 @@ public class DataLifecycleServiceTests extends ESTestCase {
 
         ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
         dataLifecycleService.run(state);
-        assertThat(clientSeenRequests.size(), is(1));
+        assertThat(clientSeenRequests.size(), is(3)); // rollover the write index, and force merge the other two
         assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
     }
 
@@ -304,6 +324,402 @@ public class DataLifecycleServiceTests extends ESTestCase {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    public void testForceMerge() throws Exception {
+        // We want this test method to get fake force merge responses, because this is what triggers a cluster state update
+        clientDelegate = (action, request, listener) -> {
+            if (action.name().equals("indices:admin/forcemerge")) {
+                listener.onResponse(new ForceMergeResponse(5, 5, 0, List.of()));
+            }
+        };
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(Version.CURRENT),
+            new DataLifecycle(TimeValue.MAX_VALUE),
+            now
+        );
+        builder.put(dataStream);
+
+        String nodeId = "localNode";
+        DiscoveryNodes.Builder nodesBuilder = buildNodes(nodeId);
+        // we are the master node
+        nodesBuilder.masterNodeId(nodeId);
+        ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).nodes(nodesBuilder).build();
+        setState(clusterService, state);
+        dataLifecycleService.run(clusterService.state());
+
+        // There are 3 backing indices. One gets rolled over. The other two get force merged:
+        assertBusy(() -> {
+            assertThat(
+                clusterService.state().metadata().index(dataStream.getIndices().get(0)).getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY),
+                notNullValue()
+            );
+            assertThat(
+                clusterService.state().metadata().index(dataStream.getIndices().get(1)).getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY),
+                notNullValue()
+            );
+            assertThat(
+                clusterService.state()
+                    .metadata()
+                    .index(dataStream.getIndices().get(0))
+                    .getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY)
+                    .get(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY),
+                notNullValue()
+            );
+            assertThat(
+                clusterService.state()
+                    .metadata()
+                    .index(dataStream.getIndices().get(1))
+                    .getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY)
+                    .get(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY),
+                notNullValue()
+            );
+        });
+        assertBusy(() -> { assertThat(clientSeenRequests.size(), is(3)); }, 30, TimeUnit.SECONDS);
+        assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
+        assertThat(((RolloverRequest) clientSeenRequests.get(0)).getRolloverTarget(), is(dataStreamName));
+        List<ForceMergeRequest> forceMergeRequests = clientSeenRequests.subList(1, 3)
+            .stream()
+            .map(transportRequest -> (ForceMergeRequest) transportRequest)
+            .toList();
+        assertThat(forceMergeRequests.get(0).indices()[0], is(dataStream.getIndices().get(0).getName()));
+        assertThat(forceMergeRequests.get(1).indices()[0], is(dataStream.getIndices().get(1).getName()));
+
+        // No changes, so running should not create any more requests
+        dataLifecycleService.run(clusterService.state());
+        assertThat(clientSeenRequests.size(), is(3));
+
+        // Add another index backing, and make sure that the only thing that happens is another force merge
+        IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(
+            DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices + 1)
+        ).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1).creationDate(now - 3000L);
+        MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L));
+        indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L));
+        IndexMetadata newIndexMetadata = indexMetaBuilder.build();
+        builder = Metadata.builder(clusterService.state().metadata()).put(newIndexMetadata, true);
+        state = ClusterState.builder(clusterService.state()).metadata(builder).build();
+        setState(clusterService, state);
+        DataStream dataStream2 = dataStream.addBackingIndex(clusterService.state().metadata(), newIndexMetadata.getIndex());
+        builder = Metadata.builder(clusterService.state().metadata());
+        builder.put(dataStream2);
+        state = ClusterState.builder(clusterService.state()).metadata(builder).build();
+        setState(clusterService, state);
+        dataLifecycleService.run(clusterService.state());
+        assertBusy(() -> { assertThat(clientSeenRequests.size(), is(4)); });
+        assertThat(((ForceMergeRequest) clientSeenRequests.get(3)).indices().length, is(1));
+        assertBusy(() -> {
+            assertThat(
+                clusterService.state().metadata().index(dataStream2.getIndices().get(2)).getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY),
+                notNullValue()
+            );
+            assertThat(
+                clusterService.state()
+                    .metadata()
+                    .index(dataStream2.getIndices().get(2))
+                    .getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY)
+                    .get(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY),
+                notNullValue()
+            );
+        });
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testForceMergeRetries() throws Exception {
+        /*
+         * This test makes sure that DLM correctly retries (or doesn't) forcemerge requests on failure.
+         * First, we set up a datastream with 3 backing indices. On the first run of DLM we'll expect one to get rolled over and two to
+         * be forcemerged.
+         */
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(Version.CURRENT),
+            new DataLifecycle(TimeValue.MAX_VALUE),
+            now
+        );
+        builder.put(dataStream);
+
+        String nodeId = "localNode";
+        DiscoveryNodes.Builder nodesBuilder = buildNodes(nodeId);
+        // we are the master node
+        nodesBuilder.masterNodeId(nodeId);
+        ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).nodes(nodesBuilder).build();
+        setState(clusterService, state);
+
+        {
+            /*
+             * For the first DLM run we're intentionally making forcemerge fail:
+             */
+            AtomicInteger forceMergeFailedCount = new AtomicInteger(0);
+            clientDelegate = (action, request, listener) -> {
+                if (action.name().equals("indices:admin/forcemerge")) {
+                    listener.onFailure(new RuntimeException("Forcemerge failure"));
+                    forceMergeFailedCount.incrementAndGet();
+                }
+            };
+            dataLifecycleService.run(clusterService.state());
+            /*
+             * We expect that DLM will try to pick it up next time.
+             */
+            assertBusy(() -> {
+                assertThat(forceMergeFailedCount.get(), equalTo(2));
+                assertThat(
+                    clusterService.state().metadata().index(dataStream.getIndices().get(0)).getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY),
+                    nullValue()
+                );
+            });
+        }
+
+        {
+            /*
+             * For the next DLM run we're intentionally making forcemerge fail by reporting failed shards:
+             */
+            AtomicInteger forceMergeFailedCount = new AtomicInteger(0);
+            clientDelegate = (action, request, listener) -> {
+                if (action.name().equals("indices:admin/forcemerge")) {
+                    listener.onResponse(
+                        new ForceMergeResponse(
+                            5,
+                            5,
+                            1,
+                            List.of(new DefaultShardOperationFailedException(new ElasticsearchException("failure")))
+                        )
+                    );
+                    forceMergeFailedCount.incrementAndGet();
+                }
+            };
+            dataLifecycleService.run(clusterService.state());
+            assertBusy(() -> {
+                assertThat(forceMergeFailedCount.get(), equalTo(2));
+                assertThat(
+                    clusterService.state().metadata().index(dataStream.getIndices().get(0)).getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY),
+                    nullValue()
+                );
+            });
+        }
+
+        {
+            /*
+             * For the next DLM run we're intentionally making forcemerge fail on the same indices by having the successful shards not equal
+             *  to the total:
+             */
+            AtomicInteger forceMergeFailedCount = new AtomicInteger(0);
+            clientDelegate = (action, request, listener) -> {
+                if (action.name().equals("indices:admin/forcemerge")) {
+                    listener.onResponse(new ForceMergeResponse(5, 4, 0, List.of()));
+                    forceMergeFailedCount.incrementAndGet();
+                }
+            };
+            dataLifecycleService.run(clusterService.state());
+            assertBusy(() -> {
+                assertThat(forceMergeFailedCount.get(), equalTo(2));
+                assertThat(
+                    clusterService.state().metadata().index(dataStream.getIndices().get(0)).getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY),
+                    nullValue()
+                );
+            });
+        }
+
+        {
+            // For the final DLM run, we let forcemerge run normally
+            clientDelegate = (action, request, listener) -> {
+                if (action.name().equals("indices:admin/forcemerge")) {
+                    listener.onResponse(new ForceMergeResponse(5, 5, 0, List.of()));
+                }
+            };
+            dataLifecycleService.run(clusterService.state());
+            /*
+             * And this time we expect that it will actually run the forcemerge, and update the marker to complete:
+             */
+            assertBusy(() -> {
+                assertThat(
+                    clusterService.state().metadata().index(dataStream.getIndices().get(0)).getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY),
+                    notNullValue()
+                );
+                assertThat(
+                    clusterService.state().metadata().index(dataStream.getIndices().get(1)).getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY),
+                    notNullValue()
+                );
+                assertThat(
+                    clusterService.state()
+                        .metadata()
+                        .index(dataStream.getIndices().get(0))
+                        .getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY)
+                        .get(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY),
+                    notNullValue()
+                );
+                assertThat(
+                    clusterService.state()
+                        .metadata()
+                        .index(dataStream.getIndices().get(1))
+                        .getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY)
+                        .get(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY),
+                    notNullValue()
+                );
+            });
+            assertBusy(() -> { assertThat(clientSeenRequests.size(), is(9)); });
+            assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
+            assertThat(((RolloverRequest) clientSeenRequests.get(0)).getRolloverTarget(), is(dataStreamName));
+            // There will be two more forcemerge requests total now: the six failed ones from before, and now the two successful ones
+            List<ForceMergeRequest> forceMergeRequests = clientSeenRequests.subList(1, 9)
+                .stream()
+                .map(transportRequest -> (ForceMergeRequest) transportRequest)
+                .toList();
+            assertThat(forceMergeRequests.get(0).indices()[0], is(dataStream.getIndices().get(0).getName()));
+            assertThat(forceMergeRequests.get(1).indices()[0], is(dataStream.getIndices().get(1).getName()));
+            assertThat(forceMergeRequests.get(2).indices()[0], is(dataStream.getIndices().get(0).getName()));
+            assertThat(forceMergeRequests.get(3).indices()[0], is(dataStream.getIndices().get(1).getName()));
+            assertThat(forceMergeRequests.get(4).indices()[0], is(dataStream.getIndices().get(0).getName()));
+            assertThat(forceMergeRequests.get(5).indices()[0], is(dataStream.getIndices().get(1).getName()));
+            assertThat(forceMergeRequests.get(6).indices()[0], is(dataStream.getIndices().get(0).getName()));
+            assertThat(forceMergeRequests.get(7).indices()[0], is(dataStream.getIndices().get(1).getName()));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testForceMergeDedup() throws Exception {
+        /*
+         * This test creates a datastream with one index, and then runs DLM repeatedly many times. We assert that the size of the
+         * transportActionsDeduplicator never goes over 1, and is 0 by the end. This is to make sure that the equals/hashcode methods
+         * of ForceMergeRequests are interacting with the deduplicator as expected.
+         */
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(Version.CURRENT),
+            new DataLifecycle(TimeValue.MAX_VALUE),
+            now
+        );
+        builder.put(dataStream);
+        IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(
+            DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices + 1)
+        ).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1).creationDate(now - 3000L);
+        MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L));
+        indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L));
+        IndexMetadata newIndexMetadata = indexMetaBuilder.build();
+        builder = Metadata.builder(clusterService.state().metadata()).put(newIndexMetadata, true);
+        ClusterState state = ClusterState.builder(clusterService.state()).metadata(builder).build();
+        setState(clusterService, state);
+        DataStream dataStream2 = dataStream.addBackingIndex(clusterService.state().metadata(), newIndexMetadata.getIndex());
+        builder = Metadata.builder(clusterService.state().metadata());
+        builder.put(dataStream2);
+        state = ClusterState.builder(clusterService.state()).metadata(builder).build();
+        setState(clusterService, state);
+        clientDelegate = (action, request, listener) -> {
+            if (action.name().equals("indices:admin/forcemerge")) {
+                listener.onResponse(new ForceMergeResponse(5, 5, 0, List.of()));
+            }
+        };
+        for (int i = 0; i < 100; i++) {
+            dataLifecycleService.run(clusterService.state());
+            assertThat(dataLifecycleService.transportActionsDeduplicator.size(), lessThanOrEqualTo(1));
+        }
+        assertBusy(() -> assertThat(dataLifecycleService.transportActionsDeduplicator.size(), equalTo(0)));
+    }
+
+    public void testUpdateForceMergeCompleteTask() throws Exception {
+        AtomicInteger onResponseCount = new AtomicInteger(0);
+        AtomicInteger failureCount = new AtomicInteger(0);
+        AtomicReference<Exception> failure = new AtomicReference<>();
+        ActionListener<Void> listener = new ActionListener<>() {
+            @Override
+            public void onResponse(Void unused) {
+                onResponseCount.incrementAndGet();
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                failureCount.incrementAndGet();
+                failure.set(e);
+            }
+        };
+        String targetIndex = randomAlphaOfLength(20);
+        DataLifecycleService.UpdateForceMergeCompleteTask task = new DataLifecycleService.UpdateForceMergeCompleteTask(
+            listener,
+            targetIndex,
+            threadPool
+        );
+        {
+            Exception exception = new RuntimeException("task failed");
+            task.onFailure(exception);
+            assertThat(failureCount.get(), equalTo(1));
+            assertThat(onResponseCount.get(), equalTo(0));
+            assertThat(failure.get(), equalTo(exception));
+            ClusterState clusterState = createClusterState(targetIndex, null);
+            ClusterState newClusterState = task.execute(clusterState);
+            IndexMetadata indexMetadata = newClusterState.metadata().index(targetIndex);
+            assertThat(indexMetadata, notNullValue());
+            Map<String, String> dlmMetadata = indexMetadata.getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY);
+            assertThat(dlmMetadata, notNullValue());
+            assertThat(dlmMetadata.size(), equalTo(1));
+            String forceMergeCompleteTimestampString = dlmMetadata.get(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY);
+            assertThat(forceMergeCompleteTimestampString, notNullValue());
+            long forceMergeCopmleteTimestamp = Long.parseLong(forceMergeCompleteTimestampString);
+            assertThat(forceMergeCopmleteTimestamp, greaterThanOrEqualTo(threadPool.absoluteTimeInMillis()));
+            // The listener's onResponse should not be called by execute():
+            assertThat(onResponseCount.get(), equalTo(0));
+        }
+        {
+            /*
+             * This is the same as the previous block, except that this time we'll have previously-existing DLM custom metadata in the
+             * index's metadata, and make sure that it doesn't get blown away when we set the timestamp.
+             */
+            String preExistingDlmCustomMetadataKey = randomAlphaOfLength(10);
+            String preExistingDlmCustomMetadataValue = randomAlphaOfLength(20);
+            Map<String, String> preExistingDlmCustomMetadata = Map.of(preExistingDlmCustomMetadataKey, preExistingDlmCustomMetadataValue);
+            ClusterState clusterState = createClusterState(targetIndex, preExistingDlmCustomMetadata);
+            ClusterState newClusterState = task.execute(clusterState);
+            IndexMetadata indexMetadata = newClusterState.metadata().index(targetIndex);
+            Map<String, String> dlmMetadata = indexMetadata.getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY);
+            assertThat(dlmMetadata, notNullValue());
+            assertThat(dlmMetadata.size(), equalTo(2));
+            assertThat(dlmMetadata.get(preExistingDlmCustomMetadataKey), equalTo(preExistingDlmCustomMetadataValue));
+            String forceMergeCompleteTimestampString = dlmMetadata.get(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY);
+            assertThat(forceMergeCompleteTimestampString, notNullValue());
+            long forceMergeCopmleteTimestamp = Long.parseLong(forceMergeCompleteTimestampString);
+            assertThat(forceMergeCopmleteTimestamp, greaterThanOrEqualTo(threadPool.absoluteTimeInMillis()));
+            // The listener's onResponse should not be called by execute():
+            assertThat(onResponseCount.get(), equalTo(0));
+        }
+    }
+
+    /*
+     * Creates a test cluster state with the given indexName. If customDlmMetadata is not null, it is added as the value of the index's
+     * custom metadata named "dlm".
+     */
+    private ClusterState createClusterState(String indexName, Map<String, String> customDlmMetadata) {
+        var routingTableBuilder = RoutingTable.builder();
+        Metadata.Builder metadataBuilder = Metadata.builder();
+        Map<String, IndexMetadata> indices = new HashMap<>();
+        Settings indexSettings = Settings.builder()
+            .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 10))
+            .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 3))
+            .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
+            .build();
+        IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName).version(randomLong()).settings(indexSettings);
+        if (customDlmMetadata != null) {
+            indexMetadataBuilder.putCustom(DLM_CUSTOM_INDEX_METADATA_KEY, customDlmMetadata);
+        }
+        indices.put(indexName, indexMetadataBuilder.build());
+        return ClusterState.builder(new ClusterName("test-cluster"))
+            .routingTable(routingTableBuilder.build())
+            .metadata(metadataBuilder.indices(indices).build())
+            .build();
+    }
+
     public void testDefaultRolloverRequest() {
         // test auto max_age and another concrete condition
         {
@@ -355,6 +771,75 @@ public class DataLifecycleServiceTests extends ESTestCase {
         }
     }
 
+    public void testForceMergeRequestWrapperEqualsHashCode() {
+        String[] indices = new String[randomIntBetween(0, 10)];
+        for (int i = 0; i < indices.length; i++) {
+            indices[i] = randomAlphaOfLength(20);
+        }
+        ForceMergeRequest originalRequest = new ForceMergeRequest(indices);
+        originalRequest.setRequestId(randomLong());
+        originalRequest.setShouldStoreResult(randomBoolean());
+        originalRequest.maxNumSegments(randomInt(1000));
+        originalRequest.setParentTask(randomAlphaOfLength(10), randomLong());
+        originalRequest.onlyExpungeDeletes(randomBoolean());
+        originalRequest.flush(randomBoolean());
+        EqualsHashCodeTestUtils.checkEqualsAndHashCode(
+            new DataLifecycleService.ForceMergeRequestWrapper(originalRequest),
+            DataLifecycleServiceTests::copyForceMergeRequestWrapperRequest,
+            DataLifecycleServiceTests::mutateForceMergeRequestWrapper
+        );
+    }
+
+    private static DataLifecycleService.ForceMergeRequestWrapper copyForceMergeRequestWrapperRequest(
+        DataLifecycleService.ForceMergeRequestWrapper original
+    ) {
+        return new DataLifecycleService.ForceMergeRequestWrapper(original);
+    }
+
+    private static DataLifecycleService.ForceMergeRequestWrapper mutateForceMergeRequestWrapper(
+        DataLifecycleService.ForceMergeRequestWrapper original
+    ) {
+        switch (randomIntBetween(0, 4)) {
+            case 0 -> {
+                DataLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
+                String[] originalIndices = original.indices();
+                int changedIndexIndex;
+                if (originalIndices.length > 0) {
+                    changedIndexIndex = randomIntBetween(0, originalIndices.length - 1);
+                } else {
+                    originalIndices = new String[1];
+                    changedIndexIndex = 0;
+                }
+                String[] newIndices = new String[originalIndices.length];
+                System.arraycopy(originalIndices, 0, newIndices, 0, originalIndices.length);
+                newIndices[changedIndexIndex] = randomAlphaOfLength(40);
+                copy.indices(newIndices);
+                return copy;
+            }
+            case 1 -> {
+                DataLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
+                copy.onlyExpungeDeletes(original.onlyExpungeDeletes() == false);
+                return copy;
+            }
+            case 2 -> {
+                DataLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
+                copy.flush(original.flush() == false);
+                return copy;
+            }
+            case 3 -> {
+                DataLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
+                copy.maxNumSegments(original.maxNumSegments() + 1);
+                return copy;
+            }
+            case 4 -> {
+                DataLifecycleService.ForceMergeRequestWrapper copy = copyForceMergeRequestWrapperRequest(original);
+                copy.setRequestId(original.getRequestId() + 1);
+                return copy;
+            }
+            default -> throw new AssertionError("Can't get here");
+        }
+    }
+
     private static RolloverConditions randomRolloverConditions(boolean includeMaxAge) {
         ByteSizeValue maxSize = randomBoolean() ? randomByteSizeValue() : null;
         ByteSizeValue maxPrimaryShardSize = randomBoolean() ? randomByteSizeValue() : null;
@@ -402,7 +887,11 @@ public class DataLifecycleServiceTests extends ESTestCase {
         );
     }
 
-    private NoOpClient getTransportRequestsRecordingClient() {
+    /**
+     * This method returns a client that keeps track of the requests it has seen in clientSeenRequests. By default it does nothing else
+     * (it does not even notify the listener), but tests can provide an implementation of clientDelegate to provide any needed behavior.
+     */
+    private Client getTransportRequestsRecordingClient() {
         return new NoOpClient(getTestName()) {
             @Override
             protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
@@ -411,7 +900,15 @@ public class DataLifecycleServiceTests extends ESTestCase {
                 ActionListener<Response> listener
             ) {
                 clientSeenRequests.add(request);
+                if (clientDelegate != null) {
+                    clientDelegate.doExecute(action, request, listener);
+                }
             }
         };
     }
+
+    private interface DoExecuteDelegate {
+        @SuppressWarnings("rawtypes")
+        void doExecute(ActionType action, ActionRequest request, ActionListener listener);
+    }
 }

+ 6 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeResponse.java

@@ -46,7 +46,12 @@ public class ForceMergeResponse extends BroadcastResponse {
         super(in);
     }
 
-    ForceMergeResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) {
+    public ForceMergeResponse(
+        int totalShards,
+        int successfulShards,
+        int failedShards,
+        List<DefaultShardOperationFailedException> shardFailures
+    ) {
         super(totalShards, successfulShards, failedShards, shardFailures);
     }