Explorar o código

Data stream lifecycle configures tail forcemerging (#97037)

This adds support for tail forcemerging in data stream lifecycle.
Before issuing the forcemerge (without the target segment specified) the
lifecycle service will aim to configure the floor segment merge policy
to `100mb` (configurable via a new cluster setting -
`data_streams.lifecycle.target.merge.policy.floor_segment`) and the merge
factor to `16` (configurable via a new cluster setting
`data_streams.lifecycle.target.merge.policy.merge_factor`).
Andrei Dan %!s(int64=2) %!d(string=hai) anos
pai
achega
3db25bdf63

+ 92 - 1
modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/DataLifecycleServiceIT.java

@@ -11,10 +11,14 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.admin.indices.flush.FlushResponse;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
 import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -34,11 +38,13 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.datastreams.DataStreamsPlugin;
 import org.elasticsearch.dlm.action.PutDataLifecycleAction;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.MergePolicyConfig;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.rest.RestStatus;
@@ -60,6 +66,10 @@ import java.util.stream.Collectors;
 import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
 import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
+import static org.elasticsearch.dlm.DataLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING;
+import static org.elasticsearch.dlm.DataLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING;
+import static org.elasticsearch.dlm.DataLifecycleService.ONE_HUNDRED_MB;
+import static org.elasticsearch.dlm.DataLifecycleService.TARGET_MERGE_FACTOR_VALUE;
 import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
 import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
 import static org.hamcrest.Matchers.containsString;
@@ -274,7 +284,12 @@ public class DataLifecycleServiceIT extends ESIntegTestCase {
             "id1",
             null,
             List.of(dataStreamName + "*"),
-            Settings.builder().put("index.number_of_replicas", 1).put("index.number_of_shards", 1).build(),
+            Settings.builder()
+                .put("index.number_of_replicas", 1)
+                .put("index.number_of_shards", 1)
+                .put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), ONE_HUNDRED_MB)
+                .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), TARGET_MERGE_FACTOR_VALUE)
+                .build(),
             null,
             lifecycle
         );
@@ -545,6 +560,82 @@ public class DataLifecycleServiceIT extends ESIntegTestCase {
         }
     }
 
+    public void testDataLifecycleServiceConfiguresTheMergePolicy() throws Exception {
+        DataLifecycle lifecycle = new DataLifecycle();
+
+        putComposableIndexTemplate(
+            "id1",
+            null,
+            List.of("metrics-foo*"),
+            Settings.builder().put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1").build(),
+            null,
+            lifecycle
+        );
+
+        String dataStreamName = "metrics-foo";
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+        client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+        indexDocs(dataStreamName, 1);
+
+        // let's allow one rollover to go through
+        assertBusy(() -> {
+            GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
+            GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
+                .actionGet();
+            assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
+            assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
+            List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
+            assertThat(backingIndices.size(), equalTo(2));
+            String backingIndex = backingIndices.get(0).getName();
+            assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
+            String writeIndex = backingIndices.get(1).getName();
+            assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
+        });
+
+        String firstGenerationIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1L);
+        ClusterGetSettingsAction.Response response = client().execute(
+            ClusterGetSettingsAction.INSTANCE,
+            new ClusterGetSettingsAction.Request()
+        ).get();
+        Settings clusterSettings = response.persistentSettings();
+
+        Integer targetFactor = DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING.get(clusterSettings);
+        ByteSizeValue targetFloor = DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING.get(clusterSettings);
+
+        assertBusy(() -> {
+            GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(firstGenerationIndex).includeDefaults(true);
+            GetSettingsResponse getSettingsResponse = client().execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet();
+            assertThat(
+                getSettingsResponse.getSetting(firstGenerationIndex, MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey()),
+                is(targetFactor.toString())
+            );
+            assertThat(
+                getSettingsResponse.getSetting(firstGenerationIndex, MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey()),
+                is(targetFloor.getStringRep())
+            );
+        });
+
+        // let's configure the data lifecycle service to configure a different merge policy for indices
+        updateClusterSettings(
+            Settings.builder()
+                .put(DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING.getKey(), 5)
+                .put(DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING.getKey(), ByteSizeValue.ofMb(5))
+        );
+
+        assertBusy(() -> {
+            GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(firstGenerationIndex).includeDefaults(true);
+            GetSettingsResponse getSettingsResponse = client().execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet();
+            assertThat(
+                getSettingsResponse.getSetting(firstGenerationIndex, MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey()),
+                is("5")
+            );
+            assertThat(
+                getSettingsResponse.getSetting(firstGenerationIndex, MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey()),
+                is(ByteSizeValue.ofMb(5).getStringRep())
+            );
+        });
+    }
+
     static void indexDocs(String dataStream, int numDocs) {
         BulkRequest bulkRequest = new BulkRequest();
         for (int i = 0; i < numDocs; i++) {

+ 5 - 1
modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecyclePlugin.java

@@ -125,7 +125,11 @@ public class DataLifecyclePlugin extends Plugin implements ActionPlugin {
             return List.of();
         }
 
-        return List.of(DataLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING);
+        return List.of(
+            DataLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING,
+            DataLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING,
+            DataLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING
+        );
     }
 
     @Override

+ 100 - 7
modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleService.java

@@ -20,6 +20,7 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
 import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
+import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.internal.Client;
@@ -40,6 +41,7 @@ import org.elasticsearch.common.scheduler.SchedulerEngine;
 import org.elasticsearch.common.scheduler.TimeValueSchedule;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.core.TimeValue;
@@ -47,6 +49,7 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.MergePolicyConfig;
 import org.elasticsearch.snapshots.SnapshotInProgressException;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportRequest;
@@ -79,6 +82,24 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
         Setting.Property.Dynamic,
         Setting.Property.NodeScope
     );
+    public static final ByteSizeValue ONE_HUNDRED_MB = ByteSizeValue.ofMb(100);
+
+    public static final int TARGET_MERGE_FACTOR_VALUE = 16;
+
+    public static final Setting<Integer> DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING = Setting.intSetting(
+        "data_streams.lifecycle.target.merge.policy.merge_factor",
+        TARGET_MERGE_FACTOR_VALUE,
+        2,
+        Setting.Property.Dynamic,
+        Setting.Property.NodeScope
+    );
+
+    public static final Setting<ByteSizeValue> DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING = Setting.byteSizeSetting(
+        "data_streams.lifecycle.target.merge.policy.floor_segment",
+        ONE_HUNDRED_MB,
+        Setting.Property.Dynamic,
+        Setting.Property.NodeScope
+    );
 
     private static final Logger logger = LogManager.getLogger(DataLifecycleService.class);
     /**
@@ -90,7 +111,6 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
      */
     static final String LIFECYCLE_CUSTOM_INDEX_METADATA_KEY = "data_stream_lifecycle";
     static final String FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY = "force_merge_completed_timestamp";
-
     private final Settings settings;
     private final Client client;
     private final ClusterService clusterService;
@@ -105,6 +125,8 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
     private SchedulerEngine.Job scheduledJob;
     private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
     private final MasterServiceTaskQueue<UpdateForceMergeCompleteTask> forceMergeClusterStateUpdateTaskQueue;
+    private volatile ByteSizeValue targetMergePolicyFloorSegment;
+    private volatile int targetMergePolicyFactor;
 
     private static final SimpleBatchedExecutor<UpdateForceMergeCompleteTask, Void> FORCE_MERGE_STATE_UPDATE_TASK_EXECUTOR =
         new SimpleBatchedExecutor<>() {
@@ -139,6 +161,8 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
         this.errorStore = errorStore;
         this.scheduledJob = null;
         this.pollInterval = DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
+        this.targetMergePolicyFloorSegment = DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING.get(settings);
+        this.targetMergePolicyFactor = DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING.get(settings);
         this.rolloverConfiguration = clusterService.getClusterSettings().get(DataLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING);
         this.forceMergeClusterStateUpdateTaskQueue = clusterService.createTaskQueue(
             "dlm-forcemerge-state-update",
@@ -156,6 +180,10 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
             .addSettingsUpdateConsumer(DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING, this::updatePollInterval);
         clusterService.getClusterSettings()
             .addSettingsUpdateConsumer(DataLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING, this::updateRolloverConfiguration);
+        clusterService.getClusterSettings()
+            .addSettingsUpdateConsumer(DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING, this::updateMergePolicyFactor);
+        clusterService.getClusterSettings()
+            .addSettingsUpdateConsumer(DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING, this::updateMergePolicyFloorSegment);
     }
 
     @Override
@@ -353,13 +381,35 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
                     logger.trace("Already force merged {}", indexName);
                     continue;
                 }
-                ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName);
-                // time to force merge the index
-                transportActionsDeduplicator.executeOnce(
-                    new ForceMergeRequestWrapper(forceMergeRequest),
-                    new ErrorRecordingActionListener(indexName, errorStore),
-                    (req, reqListener) -> forceMergeIndex(forceMergeRequest, reqListener)
+
+                ByteSizeValue configuredFloorSegmentMerge = MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.get(
+                    backingIndex.getSettings()
                 );
+                Integer configuredMergeFactor = MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.get(backingIndex.getSettings());
+                if ((configuredFloorSegmentMerge == null || configuredFloorSegmentMerge.equals(targetMergePolicyFloorSegment) == false)
+                    || (configuredMergeFactor == null || configuredMergeFactor.equals(targetMergePolicyFactor) == false)) {
+                    UpdateSettingsRequest updateMergePolicySettingsRequest = new UpdateSettingsRequest();
+                    updateMergePolicySettingsRequest.indices(indexName);
+                    updateMergePolicySettingsRequest.settings(
+                        Settings.builder()
+                            .put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), targetMergePolicyFloorSegment)
+                            .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), targetMergePolicyFactor)
+                    );
+                    updateMergePolicySettingsRequest.masterNodeTimeout(TimeValue.MAX_VALUE);
+                    transportActionsDeduplicator.executeOnce(
+                        updateMergePolicySettingsRequest,
+                        new ErrorRecordingActionListener(indexName, errorStore),
+                        (req, reqListener) -> updateIndexSetting(updateMergePolicySettingsRequest, reqListener)
+                    );
+                } else {
+                    ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName);
+                    // time to force merge the index
+                    transportActionsDeduplicator.executeOnce(
+                        new ForceMergeRequestWrapper(forceMergeRequest),
+                        new ErrorRecordingActionListener(indexName, errorStore),
+                        (req, reqListener) -> forceMergeIndex(forceMergeRequest, reqListener)
+                    );
+                }
             }
         }
     }
@@ -409,6 +459,41 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
         });
     }
 
+    private void updateIndexSetting(UpdateSettingsRequest updateSettingsRequest, ActionListener<Void> listener) {
+        assert updateSettingsRequest.indices() != null && updateSettingsRequest.indices().length == 1
+            : "Data stream lifecycle service updates the settings for one index at a time";
+        // "saving" the index name here so we don't capture the entire request
+        String targetIndex = updateSettingsRequest.indices()[0];
+        logger.trace(
+            "Data stream lifecycle service issues request to update settings [{}] for index [{}]",
+            updateSettingsRequest.settings().keySet(),
+            targetIndex
+        );
+        client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() {
+            @Override
+            public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                logger.info(
+                    "Data stream lifecycle service successfully updated settings [{}] for index index [{}]",
+                    updateSettingsRequest.settings().keySet(),
+                    targetIndex
+                );
+                listener.onResponse(null);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                if (e instanceof IndexNotFoundException) {
+                    // index was already deleted, treat this as a success
+                    errorStore.clearRecordedError(targetIndex);
+                    listener.onResponse(null);
+                    return;
+                }
+
+                listener.onFailure(e);
+            }
+        });
+    }
+
     private void deleteIndex(DeleteIndexRequest deleteIndexRequest, TimeValue retention, ActionListener<Void> listener) {
         assert deleteIndexRequest.indices() != null && deleteIndexRequest.indices().length == 1 : "DLM deletes one index at a time";
         // "saving" the index name here so we don't capture the entire request
@@ -576,6 +661,14 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
         this.rolloverConfiguration = newRolloverConfiguration;
     }
 
+    private void updateMergePolicyFloorSegment(ByteSizeValue newFloorSegment) {
+        this.targetMergePolicyFloorSegment = newFloorSegment;
+    }
+
+    private void updateMergePolicyFactor(int newFactor) {
+        this.targetMergePolicyFactor = newFactor;
+    }
+
     private void cancelJob() {
         if (scheduler.get() != null) {
             scheduler.get().remove(LIFECYCLE_JOB_NAME);

+ 128 - 28
modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleServiceTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
 import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
 import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterName;
@@ -43,6 +44,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.MergePolicyConfig;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.EqualsHashCodeTestUtils;
 import org.elasticsearch.test.client.NoOpClient;
@@ -69,8 +71,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.elasticsearch.dlm.DLMFixtures.createDataStream;
+import static org.elasticsearch.dlm.DataLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING;
+import static org.elasticsearch.dlm.DataLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING;
 import static org.elasticsearch.dlm.DataLifecycleService.FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY;
 import static org.elasticsearch.dlm.DataLifecycleService.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY;
+import static org.elasticsearch.dlm.DataLifecycleService.ONE_HUNDRED_MB;
+import static org.elasticsearch.dlm.DataLifecycleService.TARGET_MERGE_FACTOR_VALUE;
 import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
 import static org.elasticsearch.test.ClusterServiceUtils.setState;
 import static org.hamcrest.Matchers.equalTo;
@@ -95,6 +101,8 @@ public class DataLifecycleServiceTests extends ESTestCase {
         threadPool = new TestThreadPool(getTestName());
         Set<Setting<?>> builtInClusterSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
         builtInClusterSettings.add(DataLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING);
+        builtInClusterSettings.add(DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING);
+        builtInClusterSettings.add(DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING);
         ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, builtInClusterSettings);
         clusterService = createClusterService(threadPool, clusterSettings);
 
@@ -338,7 +346,8 @@ public class DataLifecycleServiceTests extends ESTestCase {
             builder,
             dataStreamName,
             numBackingIndices,
-            settings(Version.CURRENT),
+            settings(Version.CURRENT).put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), ONE_HUNDRED_MB)
+                .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), TARGET_MERGE_FACTOR_VALUE),
             new DataLifecycle(TimeValue.MAX_VALUE),
             now
         );
@@ -396,7 +405,14 @@ public class DataLifecycleServiceTests extends ESTestCase {
         // Add another index backing, and make sure that the only thing that happens is another force merge
         IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(
             DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices + 1)
-        ).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1).creationDate(now - 3000L);
+        )
+            .settings(
+                settings(Version.CURRENT).put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), ONE_HUNDRED_MB)
+                    .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), TARGET_MERGE_FACTOR_VALUE)
+            )
+            .numberOfShards(1)
+            .numberOfReplicas(1)
+            .creationDate(now - 3000L);
         MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L));
         indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L));
         IndexMetadata newIndexMetadata = indexMetaBuilder.build();
@@ -441,7 +457,8 @@ public class DataLifecycleServiceTests extends ESTestCase {
             builder,
             dataStreamName,
             numBackingIndices,
-            settings(Version.CURRENT),
+            settings(Version.CURRENT).put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), ONE_HUNDRED_MB)
+                .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), TARGET_MERGE_FACTOR_VALUE),
             new DataLifecycle(TimeValue.MAX_VALUE),
             now
         );
@@ -620,7 +637,14 @@ public class DataLifecycleServiceTests extends ESTestCase {
         builder.put(dataStream);
         IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(
             DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices + 1)
-        ).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1).creationDate(now - 3000L);
+        )
+            .settings(
+                settings(Version.CURRENT).put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), ONE_HUNDRED_MB)
+                    .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), TARGET_MERGE_FACTOR_VALUE)
+            )
+            .numberOfShards(1)
+            .numberOfReplicas(1)
+            .creationDate(now - 3000L);
         MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L));
         indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L));
         IndexMetadata newIndexMetadata = indexMetaBuilder.build();
@@ -710,30 +734,6 @@ public class DataLifecycleServiceTests extends ESTestCase {
         }
     }
 
-    /*
-     * Creates a test cluster state with the given indexName. If customDlmMetadata is not null, it is added as the value of the index's
-     * custom metadata named "dlm".
-     */
-    private ClusterState createClusterState(String indexName, Map<String, String> customDlmMetadata) {
-        var routingTableBuilder = RoutingTable.builder();
-        Metadata.Builder metadataBuilder = Metadata.builder();
-        Map<String, IndexMetadata> indices = new HashMap<>();
-        Settings indexSettings = Settings.builder()
-            .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 10))
-            .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 3))
-            .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
-            .build();
-        IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName).version(randomLong()).settings(indexSettings);
-        if (customDlmMetadata != null) {
-            indexMetadataBuilder.putCustom(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, customDlmMetadata);
-        }
-        indices.put(indexName, indexMetadataBuilder.build());
-        return ClusterState.builder(new ClusterName("test-cluster"))
-            .routingTable(routingTableBuilder.build())
-            .metadata(metadataBuilder.indices(indices).build())
-            .build();
-    }
-
     public void testDefaultRolloverRequest() {
         // test auto max_age and another concrete condition
         {
@@ -804,6 +804,106 @@ public class DataLifecycleServiceTests extends ESTestCase {
         );
     }
 
+    public void testMergePolicySettingsAreConfiguredBeforeForcemerge() throws Exception {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(Version.CURRENT),
+            new DataLifecycle(TimeValue.MAX_VALUE),
+            now
+        );
+        builder.put(dataStream);
+
+        String nodeId = "localNode";
+        DiscoveryNodes.Builder nodesBuilder = buildNodes(nodeId);
+        // we are the master node
+        nodesBuilder.masterNodeId(nodeId);
+        ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).nodes(nodesBuilder).build();
+        setState(clusterService, state);
+        dataLifecycleService.run(clusterService.state());
+
+        // There are 3 backing indices. One gets rolled over. The other two will need to have their merge policy updated:
+        assertBusy(() -> assertThat(clientSeenRequests.size(), is(3)), 30, TimeUnit.SECONDS);
+        assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
+        assertThat(((RolloverRequest) clientSeenRequests.get(0)).getRolloverTarget(), is(dataStreamName));
+        List<UpdateSettingsRequest> updateSettingsRequests = clientSeenRequests.subList(1, 3)
+            .stream()
+            .map(transportRequest -> (UpdateSettingsRequest) transportRequest)
+            .toList();
+        assertThat(updateSettingsRequests.get(0).indices()[0], is(dataStream.getIndices().get(0).getName()));
+        assertThat(updateSettingsRequests.get(1).indices()[0], is(dataStream.getIndices().get(1).getName()));
+
+        for (UpdateSettingsRequest settingsRequest : updateSettingsRequests) {
+            assertThat(
+                settingsRequest.settings()
+                    .getAsBytesSize(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), ByteSizeValue.MINUS_ONE),
+                is(ONE_HUNDRED_MB)
+            );
+            assertThat(
+                settingsRequest.settings().getAsInt(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), -1),
+                is(TARGET_MERGE_FACTOR_VALUE)
+            );
+        }
+        // No changes, so running should not create any more requests
+        dataLifecycleService.run(clusterService.state());
+        assertThat(clientSeenRequests.size(), is(3));
+
+        // let's add one more backing index that has the expected merge policy and check the data stream lifecycle issues a forcemerge
+        // request for it
+        IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(
+            DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices + 1)
+        )
+            .settings(
+                settings(Version.CURRENT).put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), ONE_HUNDRED_MB)
+                    .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), TARGET_MERGE_FACTOR_VALUE)
+            )
+            .numberOfShards(1)
+            .numberOfReplicas(1)
+            .creationDate(now - 3000L);
+        MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L));
+        indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L));
+        IndexMetadata newIndexMetadata = indexMetaBuilder.build();
+        builder = Metadata.builder(clusterService.state().metadata()).put(newIndexMetadata, true);
+        state = ClusterState.builder(clusterService.state()).metadata(builder).build();
+        setState(clusterService, state);
+        DataStream modifiedDataStream = dataStream.addBackingIndex(clusterService.state().metadata(), newIndexMetadata.getIndex());
+        builder = Metadata.builder(clusterService.state().metadata());
+        builder.put(modifiedDataStream);
+        state = ClusterState.builder(clusterService.state()).metadata(builder).build();
+        setState(clusterService, state);
+        dataLifecycleService.run(clusterService.state());
+        assertBusy(() -> assertThat(clientSeenRequests.size(), is(4)));
+        assertThat(((ForceMergeRequest) clientSeenRequests.get(3)).indices().length, is(1));
+    }
+
+    /*
+     * Creates a test cluster state with the given indexName. If customDlmMetadata is not null, it is added as the value of the index's
+     * custom metadata named "dlm".
+     */
+    private ClusterState createClusterState(String indexName, Map<String, String> customDlmMetadata) {
+        var routingTableBuilder = RoutingTable.builder();
+        Metadata.Builder metadataBuilder = Metadata.builder();
+        Map<String, IndexMetadata> indices = new HashMap<>();
+        Settings indexSettings = Settings.builder()
+            .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 10))
+            .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 3))
+            .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
+            .build();
+        IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName).version(randomLong()).settings(indexSettings);
+        if (customDlmMetadata != null) {
+            indexMetadataBuilder.putCustom(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, customDlmMetadata);
+        }
+        indices.put(indexName, indexMetadataBuilder.build());
+        return ClusterState.builder(new ClusterName("test-cluster"))
+            .routingTable(routingTableBuilder.build())
+            .metadata(metadataBuilder.indices(indices).build())
+            .build();
+    }
+
     private static DataLifecycleService.ForceMergeRequestWrapper copyForceMergeRequestWrapperRequest(
         DataLifecycleService.ForceMergeRequestWrapper original
     ) {

+ 3 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.core.security.user;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
 import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
 import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
+import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
 import org.elasticsearch.xpack.core.XPackPlugin;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
@@ -141,7 +142,8 @@ public class InternalUsers {
                         RolloverAction.NAME,
                         ForceMergeAction.NAME + "*",
                         // indices stats is used by rollover, so we need to grant it here
-                        IndicesStatsAction.NAME + "*"
+                        IndicesStatsAction.NAME + "*",
+                        UpdateSettingsAction.NAME
                     )
                     .allowRestrictedIndices(false)
                     .build(),