浏览代码

Retry ILM async action after reindexing data stream (#124149)

When reindexing a data stream, the ILM metadata is copied from the index metadata of the source index to the destination index. But the ILM state of the new index can be stuck if the source index was in an AsyncAction at the time of reindexing. To un-stick the new index, we call TransportRetryAction to retry the AsyncAction. In the past this action would only run if the index were in the error phase. This change includes an update to TransportRetryAction, which allows it to be run when the index is not in an error phase, if the parameter requireError is set to false.
Parker Timmins 7 月之前
父节点
当前提交
10a8dcf0fb

+ 5 - 0
docs/changelog/124149.yaml

@@ -0,0 +1,5 @@
+pr: 124149
+summary: Retry ILM async action after reindexing data stream
+area: Data streams
+type: enhancement
+issues: []

+ 4 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -145,6 +145,7 @@ public class TransportVersions {
     public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING = def(8_839_0_00);
     public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00);
     public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_18 = def(8_840_0_01);
+    public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18 = def(8_840_0_02);
     public static final TransportVersion INITIAL_ELASTICSEARCH_8_19 = def(8_841_0_00);
     public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_X = def(8_841_0_01);
     public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_19 = def(8_841_0_02);
@@ -152,6 +153,7 @@ public class TransportVersions {
     public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS_BACKPORT_8_19 = def(8_841_0_04);
     public static final TransportVersion VOYAGE_AI_INTEGRATION_ADDED_BACKPORT_8_X = def(8_841_0_05);
     public static final TransportVersion JINA_AI_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_19 = def(8_841_0_06);
+    public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19 = def(8_841_0_07);
     public static final TransportVersion INITIAL_ELASTICSEARCH_9_0 = def(9_000_0_00);
     public static final TransportVersion REMOVE_SNAPSHOT_FAILURES_90 = def(9_000_0_01);
     public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED_90 = def(9_000_0_02);
@@ -159,6 +161,7 @@ public class TransportVersions {
     public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION_90 = def(9_000_0_04);
     public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_9_0 = def(9_000_0_05);
     public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_90 = def(9_000_0_06);
+    public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_90 = def(9_000_0_07);
     public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED = def(9_001_0_00);
     public static final TransportVersion REMOVE_SNAPSHOT_FAILURES = def(9_002_0_00);
     public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED = def(9_003_0_00);
@@ -183,6 +186,7 @@ public class TransportVersions {
     public static final TransportVersion UNASSIGENEDINFO_RESHARD_ADDED = def(9_022_0_00);
     public static final TransportVersion INCLUDE_INDEX_MODE_IN_GET_DATA_STREAM = def(9_023_0_00);
     public static final TransportVersion MAX_OPERATION_SIZE_REJECTIONS_ADDED = def(9_024_0_00);
+    public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR = def(9_025_0_00);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 111 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/action/RetryActionRequest.java

@@ -0,0 +1,111 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ilm.action;
+
+import org.elasticsearch.TransportVersions;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.TimeValue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class RetryActionRequest extends AcknowledgedRequest<RetryActionRequest> implements IndicesRequest.Replaceable {
+    private String[] indices;
+    private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
+    private boolean requireError = true;
+
+    public RetryActionRequest(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
+        super(masterNodeTimeout, ackTimeout);
+        this.indices = indices;
+    }
+
+    public RetryActionRequest(StreamInput in) throws IOException {
+        super(in);
+        this.indices = in.readStringArray();
+        this.indicesOptions = IndicesOptions.readIndicesOptions(in);
+        if (in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR)
+            || in.getTransportVersion().isPatchFrom(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_90)
+            || in.getTransportVersion().isPatchFrom(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19)
+            || in.getTransportVersion().isPatchFrom(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) {
+            this.requireError = in.readBoolean();
+        }
+    }
+
+    @Override
+    public RetryActionRequest indices(String... indices) {
+        this.indices = indices;
+        return this;
+    }
+
+    @Override
+    public String[] indices() {
+        return indices;
+    }
+
+    @Override
+    public IndicesOptions indicesOptions() {
+        return indicesOptions;
+    }
+
+    public RetryActionRequest indicesOptions(IndicesOptions indicesOptions) {
+        this.indicesOptions = indicesOptions;
+        return this;
+    }
+
+    public void requireError(boolean requireError) {
+        this.requireError = requireError;
+    }
+
+    public boolean requireError() {
+        return requireError;
+    }
+
+    @Override
+    public ActionRequestValidationException validate() {
+        return null;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeStringArray(indices);
+        indicesOptions.writeIndicesOptions(out);
+        if (out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR)
+            || out.getTransportVersion().isPatchFrom(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_90)
+            || out.getTransportVersion().isPatchFrom(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19)
+            || out.getTransportVersion().isPatchFrom(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) {
+            out.writeBoolean(requireError);
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(Arrays.hashCode(indices), indicesOptions, requireError);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj.getClass() != getClass()) {
+            return false;
+        }
+        RetryActionRequest other = (RetryActionRequest) obj;
+        return Objects.deepEquals(indices, other.indices)
+            && Objects.equals(indicesOptions, other.indicesOptions)
+            && requireError == other.requireError;
+    }
+
+}

+ 3 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

@@ -30,6 +30,7 @@ import org.elasticsearch.action.search.TransportSearchAction;
 import org.elasticsearch.action.search.TransportSearchScrollAction;
 import org.elasticsearch.index.reindex.ReindexAction;
 import org.elasticsearch.xpack.core.XPackPlugin;
+import org.elasticsearch.xpack.core.ilm.action.ILMActions;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
 import org.elasticsearch.xpack.core.security.support.MetadataUtils;
 
@@ -222,7 +223,8 @@ public class InternalUsers {
                         TransportBulkAction.NAME,
                         TransportIndexAction.NAME,
                         TransportSearchScrollAction.TYPE.name(),
-                        ModifyDataStreamsAction.NAME
+                        ModifyDataStreamsAction.NAME,
+                        ILMActions.RETRY.name()
                     )
                     .allowRestrictedIndices(false)
                     .build() },

+ 5 - 1
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java

@@ -322,7 +322,11 @@ class IndexLifecycleRunner {
             logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized", currentStepKey, index, policy);
             return;
         }
-
+        if (expectedStepKey.phase() == null && expectedStepKey.name() == null && expectedStepKey.action() == null) {
+            // ILM is stopped, so do not try to run async action
+            logger.debug("expected step for index [{}] with policy [{}] is [{}], not running async action", index, policy, expectedStepKey);
+            return;
+        }
         logger.trace(
             "[{}] maybe running async action step ({}) with current step {}",
             index,

+ 2 - 1
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/RestRetryAction.java

@@ -15,6 +15,7 @@ import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.ilm.action.ILMActions;
+import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;
 
 import java.util.List;
 
@@ -37,7 +38,7 @@ public class RestRetryAction extends BaseRestHandler {
     @Override
     protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
         final var indices = Strings.splitStringByCommaToArray(restRequest.param("index"));
-        final var request = new TransportRetryAction.Request(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices);
+        final var request = new RetryActionRequest(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices);
         request.indices(indices);
         request.indicesOptions(IndicesOptions.fromRequest(restRequest, IndicesOptions.strictExpandOpen()));
         return channel -> client.execute(ILMActions.RETRY, request, new RestToXContentListener<>(channel));

+ 30 - 98
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java

@@ -10,11 +10,7 @@ package org.elasticsearch.xpack.ilm.action;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionRequestValidationException;
-import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.master.AcknowledgedRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@@ -25,24 +21,18 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.SuppressForbidden;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.injection.guice.Inject;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
 import org.elasticsearch.xpack.core.ilm.action.ILMActions;
+import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;
 import org.elasticsearch.xpack.ilm.IndexLifecycleService;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Objects;
-
-public class TransportRetryAction extends TransportMasterNodeAction<TransportRetryAction.Request, AcknowledgedResponse> {
+public class TransportRetryAction extends TransportMasterNodeAction<RetryActionRequest, AcknowledgedResponse> {
 
     private static final Logger logger = LogManager.getLogger(TransportRetryAction.class);
 
@@ -62,7 +52,7 @@ public class TransportRetryAction extends TransportMasterNodeAction<TransportRet
             clusterService,
             threadPool,
             actionFilters,
-            Request::new,
+            RetryActionRequest::new,
             AcknowledgedResponse::readFrom,
             EsExecutors.DIRECT_EXECUTOR_SERVICE
         );
@@ -70,7 +60,17 @@ public class TransportRetryAction extends TransportMasterNodeAction<TransportRet
     }
 
     @Override
-    protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
+    protected void masterOperation(
+        Task task,
+        RetryActionRequest request,
+        ClusterState state,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
+        if (request.requireError() == false) {
+            maybeRunAsyncAction(state, request.indices());
+            listener.onResponse(AcknowledgedResponse.TRUE);
+            return;
+        }
         submitUnbatchedTask("ilm-re-run", new AckedClusterStateUpdateTask(request, listener) {
             @Override
             public ClusterState execute(ClusterState currentState) {
@@ -79,101 +79,33 @@ public class TransportRetryAction extends TransportMasterNodeAction<TransportRet
 
             @Override
             public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-                for (String index : request.indices()) {
-                    IndexMetadata idxMeta = newState.metadata().getProject().index(index);
-                    LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
-                    StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step());
-                    if (idxMeta == null) {
-                        // The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
-                        logger.debug(
-                            "index ["
-                                + index
-                                + "] has been deleted after moving to step ["
-                                + lifecycleState.step()
-                                + "], skipping async action check"
-                        );
-                        return;
-                    }
-                    indexLifecycleService.maybeRunAsyncAction(newState, idxMeta, retryStep);
-                }
+                maybeRunAsyncAction(newState, request.indices());
             }
         });
     }
 
+    private void maybeRunAsyncAction(ClusterState state, String[] indices) {
+        for (String index : indices) {
+            IndexMetadata idxMeta = state.metadata().getProject().index(index);
+            if (idxMeta == null) {
+                // The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
+                logger.debug("index [" + index + "] has been deleted, skipping async action check");
+                return;
+            }
+            LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
+            StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step());
+            indexLifecycleService.maybeRunAsyncAction(state, idxMeta, retryStep);
+        }
+    }
+
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
     private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
         clusterService.submitUnbatchedStateUpdateTask(source, task);
     }
 
     @Override
-    protected ClusterBlockException checkBlock(Request request, ClusterState state) {
+    protected ClusterBlockException checkBlock(RetryActionRequest request, ClusterState state) {
         return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
     }
 
-    public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest.Replaceable {
-        private String[] indices;
-        private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
-
-        public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
-            super(masterNodeTimeout, ackTimeout);
-            this.indices = indices;
-        }
-
-        public Request(StreamInput in) throws IOException {
-            super(in);
-            this.indices = in.readStringArray();
-            this.indicesOptions = IndicesOptions.readIndicesOptions(in);
-        }
-
-        @Override
-        public Request indices(String... indices) {
-            this.indices = indices;
-            return this;
-        }
-
-        @Override
-        public String[] indices() {
-            return indices;
-        }
-
-        @Override
-        public IndicesOptions indicesOptions() {
-            return indicesOptions;
-        }
-
-        public Request indicesOptions(IndicesOptions indicesOptions) {
-            this.indicesOptions = indicesOptions;
-            return this;
-        }
-
-        @Override
-        public ActionRequestValidationException validate() {
-            return null;
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeStringArray(indices);
-            indicesOptions.writeIndicesOptions(out);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(Arrays.hashCode(indices), indicesOptions);
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (obj == null) {
-                return false;
-            }
-            if (obj.getClass() != getClass()) {
-                return false;
-            }
-            Request other = (Request) obj;
-            return Objects.deepEquals(indices, other.indices) && Objects.equals(indicesOptions, other.indicesOptions);
-        }
-
-    }
 }

+ 15 - 8
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/RetryRequestTests.java

@@ -11,14 +11,15 @@ import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;
 
 import java.util.Arrays;
 
-public class RetryRequestTests extends AbstractWireSerializingTestCase<TransportRetryAction.Request> {
+public class RetryRequestTests extends AbstractWireSerializingTestCase<RetryActionRequest> {
 
     @Override
-    protected TransportRetryAction.Request createTestInstance() {
-        final var request = new TransportRetryAction.Request(
+    protected RetryActionRequest createTestInstance() {
+        final var request = new RetryActionRequest(
             TEST_REQUEST_TIMEOUT,
             TEST_REQUEST_TIMEOUT,
             randomBoolean() ? Strings.EMPTY_ARRAY : generateRandomStringArray(20, 20, false)
@@ -36,19 +37,23 @@ public class RetryRequestTests extends AbstractWireSerializingTestCase<Transport
             );
             request.indicesOptions(indicesOptions);
         }
+        if (randomBoolean()) {
+            request.requireError(randomBoolean());
+        }
         return request;
     }
 
     @Override
-    protected Writeable.Reader<TransportRetryAction.Request> instanceReader() {
-        return TransportRetryAction.Request::new;
+    protected Writeable.Reader<RetryActionRequest> instanceReader() {
+        return RetryActionRequest::new;
     }
 
     @Override
-    protected TransportRetryAction.Request mutateInstance(TransportRetryAction.Request instance) {
+    protected RetryActionRequest mutateInstance(RetryActionRequest instance) {
         String[] indices = instance.indices();
         IndicesOptions indicesOptions = instance.indicesOptions();
-        switch (between(0, 1)) {
+        boolean requireError = instance.requireError();
+        switch (between(0, 2)) {
             case 0 -> indices = randomValueOtherThanMany(
                 i -> Arrays.equals(i, instance.indices()),
                 () -> generateRandomStringArray(20, 10, false, true)
@@ -66,10 +71,12 @@ public class RetryRequestTests extends AbstractWireSerializingTestCase<Transport
                     randomBoolean()
                 )
             );
+            case 2 -> requireError = requireError == false;
             default -> throw new AssertionError("Illegal randomisation branch");
         }
-        final var newRequest = new TransportRetryAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, indices);
+        final var newRequest = new RetryActionRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, indices);
         newRequest.indicesOptions(indicesOptions);
+        newRequest.requireError(requireError);
         return newRequest;
     }
 }

+ 24 - 1
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

@@ -40,6 +40,8 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.persistent.PersistentTasksExecutor;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.ilm.action.ILMActions;
+import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest;
 import org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexAction;
 
 import java.util.ArrayList;
@@ -265,13 +267,34 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec
                 var settings = Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, lifecycleName).build();
                 var updateSettingsRequest = new UpdateSettingsRequest(settings, newIndex);
                 updateSettingsRequest.setParentTask(parentTaskId);
-                client.execute(TransportUpdateSettingsAction.TYPE, updateSettingsRequest, delegate);
+                client.execute(
+                    TransportUpdateSettingsAction.TYPE,
+                    updateSettingsRequest,
+                    delegate.delegateFailure((delegate2, response2) -> {
+                        maybeRunILMAsyncAction(newIndex, delegate2, parentTaskId);
+                    })
+                );
             } else {
                 delegate.onResponse(null);
             }
         }));
     }
 
+    /**
+     * If ILM runs an async action on the source index shortly before reindexing, the results of the async action
+     * may not yet be in the source index. For example, if a force merge has just been started by ILM, the reindex
+     * will see the un-force-merged index. But the ILM state will be copied to destination index saying that an
+     * async action was started, and so ILM won't force merge the destination index. To be sure that the async
+     * action is run on the destination index, we force a retry on async actions after adding the ILM policy
+     * to the destination index.
+     */
+    private void maybeRunILMAsyncAction(String newIndex, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
+        var retryActionRequest = new RetryActionRequest(TimeValue.MAX_VALUE, TimeValue.MAX_VALUE, newIndex);
+        retryActionRequest.setParentTask(parentTaskId);
+        retryActionRequest.requireError(false);
+        client.execute(ILMActions.RETRY, retryActionRequest, listener);
+    }
+
     private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
         DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
         deleteIndexRequest.setParentTask(parentTaskId);

+ 81 - 25
x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

@@ -193,15 +193,29 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
         String dataStreamName = "reindex_test_data_stream";
         String dataStreamFromNonDataStreamIndices = "index_first_reindex_test_data_stream";
         int numRollovers = randomIntBetween(0, 5);
+        boolean hasILMPolicy = randomBoolean();
+        boolean ilmEnabled = hasILMPolicy && randomBoolean();
+
+        if (ilmEnabled) {
+            startILM();
+        } else {
+            stopILM();
+        }
+
         if (CLUSTER_TYPE == ClusterType.OLD) {
-            createAndRolloverDataStream(dataStreamName, numRollovers);
+            createAndRolloverDataStream(dataStreamName, numRollovers, hasILMPolicy, ilmEnabled);
             createDataStreamFromNonDataStreamIndices(dataStreamFromNonDataStreamIndices);
         } else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
             Map<String, Map<String, Object>> oldIndicesMetadata = getIndicesMetadata(dataStreamName);
-            upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0);
-            upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 1, 0);
+            upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled);
+            upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 1, 0, ilmEnabled);
             Map<String, Map<String, Object>> upgradedIndicesMetadata = getIndicesMetadata(dataStreamName);
-            compareIndexMetadata(oldIndicesMetadata, upgradedIndicesMetadata);
+
+            if (ilmEnabled) {
+                checkILMPhase(dataStreamName, upgradedIndicesMetadata);
+            } else {
+                compareIndexMetadata(oldIndicesMetadata, upgradedIndicesMetadata);
+            }
         }
     }
 
@@ -231,6 +245,28 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private void checkILMPhase(String dataStreamName, Map<String, Map<String, Object>> upgradedIndicesMetadata) throws Exception {
+        var writeIndex = getWriteIndexFromDataStreamIndexMetadata(upgradedIndicesMetadata);
+        assertBusy(() -> {
+
+            Request request = new Request("GET", dataStreamName + "/_ilm/explain");
+            Response response = client().performRequest(request);
+            Map<String, Object> responseMap = XContentHelper.convertToMap(
+                JsonXContent.jsonXContent,
+                response.getEntity().getContent(),
+                false
+            );
+            Map<String, Object> indices = (Map<String, Object>) responseMap.get("indices");
+            for (var index : indices.keySet()) {
+                if (index.equals(writeIndex) == false) {
+                    Map<String, Object> ilmInfo = (Map<String, Object>) indices.get(index);
+                    assertThat("Index has not moved to cold ILM phase", ilmInfo.get("phase"), equalTo("cold"));
+                }
+            }
+        }, 30, TimeUnit.SECONDS);
+    }
+
     private String getWriteIndexFromDataStreamIndexMetadata(Map<String, Map<String, Object>> indexMetadataForDataStream) {
         return indexMetadataForDataStream.entrySet()
             .stream()
@@ -240,6 +276,25 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
             .get();
     }
 
+    private void startILM() throws IOException {
+        setILMInterval();
+        var request = new Request("POST", "/_ilm/start");
+        assertOK(client().performRequest(request));
+    }
+
+    private void stopILM() throws IOException {
+        var request = new Request("POST", "/_ilm/stop");
+        assertOK(client().performRequest(request));
+    }
+
+    private void setILMInterval() throws IOException {
+        Request request = new Request("PUT", "/_cluster/settings");
+        request.setJsonEntity("""
+            { "persistent": {"indices.lifecycle.poll_interval": "1s"} }
+            """);
+        assertOK(client().performRequest(request));
+    }
+
     @SuppressWarnings("unchecked")
     long getCreationDate(Map<String, Object> indexMetadata) {
         return Long.parseLong(
@@ -288,9 +343,9 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
         return (Map<String, Object>) ((Map<String, Object>) indexMetadata.get("settings")).get("index");
     }
 
-    private void createAndRolloverDataStream(String dataStreamName, int numRollovers) throws IOException {
-        boolean useIlm = randomBoolean();
-        if (useIlm) {
+    private void createAndRolloverDataStream(String dataStreamName, int numRollovers, boolean hasILMPolicy, boolean ilmEnabled)
+        throws IOException {
+        if (hasILMPolicy) {
             createIlmPolicy();
         }
         // We want to create a data stream and roll it over several times so that we have several indices to upgrade
@@ -299,7 +354,7 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
                 "settings":{
                     "index": {
                         $ILM_SETTING
-                        "mode": "time_series"
+                        "mode": "standard"
                     }
                 },
                 $DSL_TEMPLATE
@@ -320,8 +375,7 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
                             "type": "date"
                         },
                         "metricset": {
-                            "type": "keyword",
-                            "time_series_dimension": true
+                            "type": "keyword"
                         },
                         "k8s": {
                             "properties": {
@@ -348,7 +402,7 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
                 }
             }
             """;
-        if (useIlm) {
+        if (hasILMPolicy) {
             template = template.replace("$ILM_SETTING", """
                 "lifecycle.name": "test-lifecycle-policy",
                 """);
@@ -374,7 +428,7 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
         bulkLoadData(dataStreamName);
         for (int i = 0; i < numRollovers; i++) {
             String oldIndexName = rollover(dataStreamName);
-            if (randomBoolean()) {
+            if (ilmEnabled == false && randomBoolean()) {
                 closeIndex(oldIndexName);
             }
             bulkLoadData(dataStreamName);
@@ -386,21 +440,18 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
             {
               "policy": {
                 "phases": {
-                  "hot": {
+                  "warm": {
+                    "min_age": "1s",
                     "actions": {
-                      "rollover": {
-                        "max_primary_shard_size": "50kb"
+                      "forcemerge": {
+                        "max_num_segments": 1
                       }
                     }
                   },
-                  "warm": {
-                    "min_age": "30d",
+                  "cold": {
                     "actions": {
-                      "shrink": {
-                        "number_of_shards": 1
-                      },
-                      "forcemerge": {
-                        "max_num_segments": 1
+                      "set_priority" : {
+                        "priority": 50
                       }
                     }
                   }
@@ -554,13 +605,18 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
     }
 
     @SuppressWarnings("unchecked")
-    private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster, int expectedSuccessesCount, int expectedErrorCount)
-        throws Exception {
+    private void upgradeDataStream(
+        String dataStreamName,
+        int numRolloversOnOldCluster,
+        int expectedSuccessesCount,
+        int expectedErrorCount,
+        boolean ilmEnabled
+    ) throws Exception {
         Set<String> indicesNeedingUpgrade = getDataStreamIndices(dataStreamName);
         final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2);
         for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) {
             String oldIndexName = rollover(dataStreamName);
-            if (randomBoolean()) {
+            if (ilmEnabled == false && randomBoolean()) {
                 closeIndex(oldIndexName);
             }
         }