Browse Source

ILM: add support for rolling over data streams (#57295)

As the datastream information is stored in the `ClusterState.Metadata` we exposed
the `Metadata` to the `AsyncWaitStep#evaluateCondition` method in order for
the steps to be able to identify when a managed index is part of a DataStream.

If a managed index is part of a DataStream the rollover target is the DataStream
name and the highest generation index is the write index (ie. the rolled index).
Andrei Dan 5 years ago
parent
commit
6b410dfb78
28 changed files with 819 additions and 448 deletions
  1. 0 6
      client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java
  2. 0 6
      qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java
  3. 6 0
      test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java
  4. 4 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java
  5. 29 19
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java
  6. 7 6
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java
  7. 23 9
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStep.java
  8. 52 32
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsStep.java
  9. 5 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStep.java
  10. 5 5
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStep.java
  11. 68 55
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStep.java
  12. 6 4
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AbstractStepMasterTimeoutTestCase.java
  13. 102 20
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java
  14. 9 4
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SegmentCountStepTests.java
  15. 33 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStepTests.java
  16. 40 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java
  17. 40 36
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStepTests.java
  18. 5 4
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStepTests.java
  19. 72 32
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java
  20. 127 0
      x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java
  21. 2 12
      x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/ChangePolicyforIndexIT.java
  22. 40 0
      x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java
  23. 130 179
      x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java
  24. 2 2
      x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java
  25. 3 2
      x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java
  26. 2 2
      x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java
  27. 1 0
      x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/package-info.java
  28. 6 6
      x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java

+ 0 - 6
client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java

@@ -1245,12 +1245,6 @@ public class IndicesClientIT extends ESRestHighLevelClientTestCase {
                 + "reason=final index setting [index.number_of_shards], not updateable"));
     }
 
-    @SuppressWarnings("unchecked")
-    private Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
-        Map<String, Object> indexSettings = getIndexSettings(index);
-        return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
-    }
-
     public void testIndexPutSettingNonExistent() throws IOException {
 
         String index = "index";

+ 0 - 6
qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

@@ -731,10 +731,4 @@ public class RecoveryIT extends AbstractRollingTestCase {
         ensureGreen(indexName);
         indexDocs(indexName, randomInt(100), randomInt(100));
     }
-
-    @SuppressWarnings("unchecked")
-    private Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
-        Map<String, Object> indexSettings = getIndexSettings(index);
-        return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
-    }
 }

+ 6 - 0
test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

@@ -1102,6 +1102,12 @@ public abstract class ESRestTestCase extends ESTestCase {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    protected Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
+        Map<String, Object> indexSettings = getIndexSettings(index);
+        return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
+    }
+
     protected static boolean indexExists(String index) throws IOException {
         Response response = client().performRequest(new Request("HEAD", "/" + index));
         return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();

+ 4 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java

@@ -6,14 +6,15 @@
 package org.elasticsearch.xpack.core.ilm;
 
 import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.index.Index;
 
 /**
  * A step which will be called periodically, waiting for some condition to become true.
  * Called asynchronously, as the condition may take time to check.
- *
+ * <p>
  * If checking something based on the current cluster state which does not take time to check, use {@link ClusterStateWaitStep}.
  */
 public abstract class AsyncWaitStep extends Step {
@@ -29,7 +30,7 @@ public abstract class AsyncWaitStep extends Step {
         return client;
     }
 
-    public abstract void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout);
+    public abstract void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout);
 
     public interface Listener {
 

+ 29 - 19
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateObserver;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.Strings;
 
@@ -39,38 +40,47 @@ public class RolloverStep extends AsyncActionStep {
     @Override
     public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
                               ClusterStateObserver observer, Listener listener) {
+        String indexName = indexMetadata.getIndex().getName();
         boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetadata.getSettings());
         if (indexingComplete) {
             logger.trace(indexMetadata.getIndex() + " has lifecycle complete set, skipping " + RolloverStep.NAME);
             listener.onResponse(true);
             return;
         }
+        IndexAbstraction indexAbstraction = currentClusterState.metadata().getIndicesLookup().get(indexName);
+        assert indexAbstraction != null : "expected the index " + indexName + " to exist in the lookup but it didn't";
+        final String rolloverTarget;
+        if (indexAbstraction.getParentDataStream() != null) {
+            rolloverTarget = indexAbstraction.getParentDataStream().getName();
+        } else {
+            String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
 
-        String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
+            if (Strings.isNullOrEmpty(rolloverAlias)) {
+                listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
+                    "setting [%s] for index [%s] is empty or not defined, it must be set to the name of the alias pointing to the group " +
+                        "of indices being rolled over", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, indexName)));
+                return;
+            }
 
-        if (Strings.isNullOrEmpty(rolloverAlias)) {
-            listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
-                "setting [%s] for index [%s] is empty or not defined", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS,
-                indexMetadata.getIndex().getName())));
-            return;
-        }
+            if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) {
+                logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again",
+                    indexName, rolloverAlias);
+                listener.onResponse(true);
+                return;
+            }
 
-        if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) {
-            logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again",
-                indexMetadata.getIndex().getName(), rolloverAlias);
-            listener.onResponse(true);
-            return;
-        }
+            if (indexMetadata.getAliases().containsKey(rolloverAlias) == false) {
+                listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
+                    "%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias,
+                    indexName)));
+                return;
+            }
 
-        if (indexMetadata.getAliases().containsKey(rolloverAlias) == false) {
-            listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
-                "%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias,
-                indexMetadata.getIndex().getName())));
-            return;
+            rolloverTarget = rolloverAlias;
         }
 
         // Calling rollover with no conditions will always roll over the index
-        RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null)
+        RolloverRequest rolloverRequest = new RolloverRequest(rolloverTarget, null)
             .masterNodeTimeout(getMasterTimeout(currentClusterState));
         // We don't wait for active shards when we perform the rollover because the
         // {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so

+ 7 - 6
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java

@@ -13,7 +13,7 @@ import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
 import org.elasticsearch.action.admin.indices.segments.ShardSegments;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
@@ -21,6 +21,7 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.Index;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -49,16 +50,16 @@ public class SegmentCountStep extends AsyncWaitStep {
     }
 
     @Override
-    public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
-        getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetadata.getIndex().getName()),
+    public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
+        getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()),
             ActionListener.wrap(response -> {
-                IndexSegments idxSegments = response.getIndices().get(indexMetadata.getIndex().getName());
+                IndexSegments idxSegments = response.getIndices().get(index.getName());
                 if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
                     final DefaultShardOperationFailedException[] failures = response.getShardFailures();
                     logger.info("[{}] retrieval of segment counts after force merge did not succeed, " +
                             "there were {} shard failures. " +
                             "failures: {}",
-                        indexMetadata.getIndex().getName(),
+                        index.getName(),
                         response.getFailedShards(),
                         failures == null ? "n/a" : Strings.collectionToDelimitedString(Arrays.stream(failures)
                             .map(Strings::toString)
@@ -73,7 +74,7 @@ public class SegmentCountStep extends AsyncWaitStep {
                         Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
                             .collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
                         logger.info("[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
-                            indexMetadata.getIndex().getName(), maxNumSegments, unmergedShards.size(), unmergedShardCounts);
+                            index.getName(), maxNumSegments, unmergedShards.size(), unmergedShardCounts);
                     }
                     // Force merging is best effort, so always return true that the condition has been met.
                     listener.onResponse(true, new Info(unmergedShards.size()));

+ 23 - 9
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStep.java

@@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.Strings;
@@ -52,16 +53,11 @@ public class UpdateRolloverLifecycleDateStep extends ClusterStateActionStep {
             // so just use the current time.
             newIndexTime = fallbackTimeSupplier.getAsLong();
         } else {
-            // find the newly created index from the rollover and fetch its index.creation_date
-            String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
-            if (Strings.isNullOrEmpty(rolloverAlias)) {
-                throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
-                    + "] is not set on index [" + indexMetadata.getIndex().getName() + "]");
-            }
-            RolloverInfo rolloverInfo = indexMetadata.getRolloverInfos().get(rolloverAlias);
+            final String rolloverTarget = getRolloverTarget(index, currentState);
+            RolloverInfo rolloverInfo = indexMetadata.getRolloverInfos().get(rolloverTarget);
             if (rolloverInfo == null) {
-                throw new IllegalStateException("no rollover info found for [" + indexMetadata.getIndex().getName() + "] with alias [" +
-                    rolloverAlias + "], the index has not yet rolled over with that alias");
+                throw new IllegalStateException("no rollover info found for [" + indexMetadata.getIndex().getName() +
+                    "] with rollover target [" + rolloverTarget + "], the index has not yet rolled over with that target");
             }
             newIndexTime = rolloverInfo.getTime();
         }
@@ -76,6 +72,24 @@ public class UpdateRolloverLifecycleDateStep extends ClusterStateActionStep {
             .put(newIndexMetadata)).build();
     }
 
+    private static String getRolloverTarget(Index index, ClusterState currentState) {
+        IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(index.getName());
+        final String rolloverTarget;
+        if (indexAbstraction.getParentDataStream() != null) {
+            rolloverTarget = indexAbstraction.getParentDataStream().getName();
+        } else {
+            // find the newly created index from the rollover and fetch its index.creation_date
+            IndexMetadata indexMetadata = currentState.metadata().index(index);
+            String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
+            if (Strings.isNullOrEmpty(rolloverAlias)) {
+                throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
+                    + "] is not set on index [" + indexMetadata.getIndex().getName() + "]");
+            }
+            rolloverTarget = rolloverAlias;
+        }
+        return rolloverTarget;
+    }
+
     @Override
     public int hashCode() {
         return super.hashCode();

+ 52 - 32
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsStep.java

@@ -10,8 +10,10 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.common.ParseField;
@@ -46,7 +48,8 @@ public class WaitForActiveShardsStep extends ClusterStateWaitStep {
 
     @Override
     public Result isConditionMet(Index index, ClusterState clusterState) {
-        IndexMetadata originalIndexMeta = clusterState.metadata().index(index);
+        Metadata metadata = clusterState.metadata();
+        IndexMetadata originalIndexMeta = metadata.index(index);
 
         if (originalIndexMeta == null) {
             String errorMessage = String.format(Locale.ROOT, "[%s] lifecycle action for index [%s] executed but index no longer exists",
@@ -64,43 +67,50 @@ public class WaitForActiveShardsStep extends ClusterStateWaitStep {
             return new Result(true, new Info(message));
         }
 
-        String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(originalIndexMeta.getSettings());
-        if (Strings.isNullOrEmpty(rolloverAlias)) {
-            throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
-                + "] is not set on index [" + originalIndexMeta.getIndex().getName() + "]");
-        }
-
-        IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(rolloverAlias);
-        assert indexAbstraction.getType() == IndexAbstraction.Type.ALIAS : rolloverAlias + " must be an alias but it is not";
-
-        IndexMetadata aliasWriteIndex = indexAbstraction.getWriteIndex();
+        IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(index.getName());
         final String rolledIndexName;
         final String waitForActiveShardsSettingValue;
-        if (aliasWriteIndex != null) {
-            rolledIndexName = aliasWriteIndex.getIndex().getName();
-            waitForActiveShardsSettingValue = aliasWriteIndex.getSettings().get("index.write.wait_for_active_shards");
+        if (indexAbstraction.getParentDataStream() != null) {
+            DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream();
+            IndexAbstraction dataStreamAbstraction = metadata.getIndicesLookup().get(dataStream.getName());
+            assert dataStreamAbstraction != null : dataStream.getName() + " datastream is not present in the metadata indices lookup";
+            IndexMetadata rolledIndexMeta = dataStreamAbstraction.getWriteIndex();
+            if (rolledIndexMeta == null) {
+                return getErrorResultOnNullMetadata(getKey(), index);
+            }
+            rolledIndexName = rolledIndexMeta.getIndex().getName();
+            waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey());
         } else {
-            List<IndexMetadata> indices = indexAbstraction.getIndices();
-            int maxIndexCounter = -1;
-            IndexMetadata rolledIndexMeta = null;
-            for (IndexMetadata indexMetadata : indices) {
-                int indexNameCounter = parseIndexNameCounter(indexMetadata.getIndex().getName());
-                if (maxIndexCounter < indexNameCounter) {
-                    maxIndexCounter = indexNameCounter;
-                    rolledIndexMeta = indexMetadata;
-                }
+            String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(originalIndexMeta.getSettings());
+            if (Strings.isNullOrEmpty(rolloverAlias)) {
+                throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
+                    + "] is not set on index [" + originalIndexMeta.getIndex().getName() + "]");
             }
-            if (rolledIndexMeta == null) {
-                String errorMessage = String.format(Locale.ROOT,
-                    "unable to find the index that was rolled over from [%s] as part of lifecycle action [%s]", index.getName(),
-                    getKey().getAction());
 
-                // Index must have been since deleted
-                logger.debug(errorMessage);
-                return new Result(false, new Info(errorMessage));
+            IndexAbstraction aliasAbstraction = metadata.getIndicesLookup().get(rolloverAlias);
+            assert aliasAbstraction.getType() == IndexAbstraction.Type.ALIAS : rolloverAlias + " must be an alias but it is not";
+
+            IndexMetadata aliasWriteIndex = aliasAbstraction.getWriteIndex();
+            if (aliasWriteIndex != null) {
+                rolledIndexName = aliasWriteIndex.getIndex().getName();
+                waitForActiveShardsSettingValue = aliasWriteIndex.getSettings().get(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey());
+            } else {
+                List<IndexMetadata> indices = aliasAbstraction.getIndices();
+                int maxIndexCounter = -1;
+                IndexMetadata rolledIndexMeta = null;
+                for (IndexMetadata indexMetadata : indices) {
+                    int indexNameCounter = parseIndexNameCounter(indexMetadata.getIndex().getName());
+                    if (maxIndexCounter < indexNameCounter) {
+                        maxIndexCounter = indexNameCounter;
+                        rolledIndexMeta = indexMetadata;
+                    }
+                }
+                if (rolledIndexMeta == null) {
+                    return getErrorResultOnNullMetadata(getKey(), index);
+                }
+                rolledIndexName = rolledIndexMeta.getIndex().getName();
+                waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get("index.write.wait_for_active_shards");
             }
-            rolledIndexName = rolledIndexMeta.getIndex().getName();
-            waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get("index.write.wait_for_active_shards");
         }
 
         ActiveShardCount activeShardCount = ActiveShardCount.parseString(waitForActiveShardsSettingValue);
@@ -114,6 +124,16 @@ public class WaitForActiveShardsStep extends ClusterStateWaitStep {
         return new Result(enoughShardsActive, new ActiveShardsInfo(currentActiveShards, activeShardCount.toString(), enoughShardsActive));
     }
 
+    private static Result getErrorResultOnNullMetadata(StepKey key, Index originalIndex) {
+        String errorMessage = String.format(Locale.ROOT,
+            "unable to find the index that was rolled over from [%s] as part of lifecycle action [%s]", originalIndex.getName(),
+            key.getAction());
+
+        // Index must have been since deleted
+        logger.debug(errorMessage);
+        return new Result(false, new Info(errorMessage));
+    }
+
     /**
      * Parses the number from the rolled over index name. It also supports the date-math format (ie. index name is wrapped in &lt; and &gt;)
      * <p>

+ 5 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStep.java

@@ -8,11 +8,13 @@ package org.elasticsearch.xpack.core.ilm;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.Index;
 import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
 import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
 
@@ -33,7 +35,8 @@ final class WaitForFollowShardTasksStep extends AsyncWaitStep {
     }
 
     @Override
-    public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
+    public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
+        IndexMetadata indexMetadata = metadata.index(index);
         Map<String, String> customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY);
         if (customIndexMetadata == null) {
             listener.onResponse(true, null);
@@ -41,7 +44,7 @@ final class WaitForFollowShardTasksStep extends AsyncWaitStep {
         }
 
         FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
-        request.setIndices(new String[]{indexMetadata.getIndex().getName()});
+        request.setIndices(new String[]{index.getName()});
         getClient().execute(FollowStatsAction.INSTANCE, request,
             ActionListener.wrap(r -> handleResponse(r, listener), listener::onFailure));
     }

+ 5 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStep.java

@@ -13,11 +13,12 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.Index;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -43,17 +44,16 @@ public class WaitForNoFollowersStep extends AsyncWaitStep {
     }
 
     @Override
-    public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
+    public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
         IndicesStatsRequest request = new IndicesStatsRequest();
         request.clear();
-        String indexName = indexMetadata.getIndex().getName();
+        String indexName = index.getName();
         request.indices(indexName);
         getClient().admin().indices().stats(request, ActionListener.wrap((response) -> {
             IndexStats indexStats = response.getIndex(indexName);
             if (indexStats == null) {
                 // Index was probably deleted
-                logger.debug("got null shard stats for index {}, proceeding on the assumption it has been deleted",
-                    indexMetadata.getIndex());
+                logger.debug("got null shard stats for index {}, proceeding on the assumption it has been deleted", indexName);
                 listener.onResponse(true, null);
                 return;
             }

+ 68 - 55
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStep.java

@@ -11,12 +11,15 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.Index;
 
 import java.io.IOException;
 import java.util.Locale;
@@ -48,73 +51,83 @@ public class WaitForRolloverReadyStep extends AsyncWaitStep {
     }
 
     @Override
-    public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
-        String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
-
-        if (Strings.isNullOrEmpty(rolloverAlias)) {
-            listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
-                "setting [%s] for index [%s] is empty or not defined", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS,
-                indexMetadata.getIndex().getName())));
-            return;
-        }
+    public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
+        IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(index.getName());
+        assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found";
+        final String rolloverTarget;
+        if (indexAbstraction.getParentDataStream() != null) {
+            rolloverTarget = indexAbstraction.getParentDataStream().getName();
+        } else {
+            IndexMetadata indexMetadata = metadata.index(index);
+            String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
+
+            if (Strings.isNullOrEmpty(rolloverAlias)) {
+                listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
+                    "setting [%s] for index [%s] is empty or not defined", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS,
+                    index.getName())));
+                return;
+            }
 
-        if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) {
-            logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again",
-                indexMetadata.getIndex().getName(), rolloverAlias);
-            listener.onResponse(true, new WaitForRolloverReadyStep.EmptyInfo());
-            return;
-        }
+            if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) {
+                logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again",
+                    index.getName(), rolloverAlias);
+                listener.onResponse(true, new WaitForRolloverReadyStep.EmptyInfo());
+                return;
+            }
 
-        // The order of the following checks is important in ways which may not be obvious.
+            // The order of the following checks is important in ways which may not be obvious.
 
-        // First, figure out if 1) The configured alias points to this index, and if so,
-        // whether this index is the write alias for this index
-        boolean aliasPointsToThisIndex = indexMetadata.getAliases().containsKey(rolloverAlias);
+            // First, figure out if 1) The configured alias points to this index, and if so,
+            // whether this index is the write alias for this index
+            boolean aliasPointsToThisIndex = indexMetadata.getAliases().containsKey(rolloverAlias);
 
-        Boolean isWriteIndex = null;
-        if (aliasPointsToThisIndex) {
-            // The writeIndex() call returns a tri-state boolean:
-            // true  -> this index is the write index for this alias
-            // false -> this index is not the write index for this alias
-            // null  -> this alias is a "classic-style" alias and does not have a write index configured, but only points to one index
-            //          and is thus the write index by default
-            isWriteIndex = indexMetadata.getAliases().get(rolloverAlias).writeIndex();
-        }
+            Boolean isWriteIndex = null;
+            if (aliasPointsToThisIndex) {
+                // The writeIndex() call returns a tri-state boolean:
+                // true  -> this index is the write index for this alias
+                // false -> this index is not the write index for this alias
+                // null  -> this alias is a "classic-style" alias and does not have a write index configured, but only points to one index
+                //          and is thus the write index by default
+                isWriteIndex = indexMetadata.getAliases().get(rolloverAlias).writeIndex();
+            }
 
-        boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetadata.getSettings());
-        if (indexingComplete) {
-            logger.trace(indexMetadata.getIndex() + " has lifecycle complete set, skipping " + WaitForRolloverReadyStep.NAME);
-            // If this index is still the write index for this alias, skipping rollover and continuing with the policy almost certainly
-            // isn't what we want, as something likely still expects to be writing to this index.
-            // If the alias doesn't point to this index, that's okay as that will be the result if this index is using a
-            // "classic-style" alias and has already rolled over, and we want to continue with the policy.
-            if (aliasPointsToThisIndex && Boolean.TRUE.equals(isWriteIndex)) {
-                listener.onFailure(new IllegalStateException(String.format(Locale.ROOT,
-                    "index [%s] has [%s] set to [true], but is still the write index for alias [%s]",
-                    indexMetadata.getIndex().getName(), LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, rolloverAlias)));
+            boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetadata.getSettings());
+            if (indexingComplete) {
+                logger.trace(index + " has lifecycle complete set, skipping " + WaitForRolloverReadyStep.NAME);
+                // If this index is still the write index for this alias, skipping rollover and continuing with the policy almost certainly
+                // isn't what we want, as something likely still expects to be writing to this index.
+                // If the alias doesn't point to this index, that's okay as that will be the result if this index is using a
+                // "classic-style" alias and has already rolled over, and we want to continue with the policy.
+                if (aliasPointsToThisIndex && Boolean.TRUE.equals(isWriteIndex)) {
+                    listener.onFailure(new IllegalStateException(String.format(Locale.ROOT,
+                        "index [%s] has [%s] set to [true], but is still the write index for alias [%s]",
+                        index.getName(), LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, rolloverAlias)));
+                    return;
+                }
+
+                listener.onResponse(true, new WaitForRolloverReadyStep.EmptyInfo());
                 return;
             }
 
-            listener.onResponse(true, new WaitForRolloverReadyStep.EmptyInfo());
-            return;
-        }
+            // If indexing_complete is *not* set, and the alias does not point to this index, we can't roll over this index, so error out.
+            if (aliasPointsToThisIndex == false) {
+                listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
+                    "%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias,
+                    index.getName())));
+                return;
+            }
 
-        // If indexing_complete is *not* set, and the alias does not point to this index, we can't roll over this index, so error out.
-        if (aliasPointsToThisIndex == false) {
-            listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
-                "%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias,
-                indexMetadata.getIndex().getName())));
-            return;
-        }
+            // Similarly, if isWriteIndex is false (see note above on false vs. null), we can't roll over this index, so error out.
+            if (Boolean.FALSE.equals(isWriteIndex)) {
+                listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
+                    "index [%s] is not the write index for alias [%s]", index.getName(), rolloverAlias)));
+                return;
+            }
 
-        // Similarly, if isWriteIndex is false (see note above on false vs. null), we can't roll over this index, so error out.
-        if (Boolean.FALSE.equals(isWriteIndex)) {
-            listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
-                "index [%s] is not the write index for alias [%s]", indexMetadata.getIndex().getName(), rolloverAlias)));
-            return;
+            rolloverTarget = rolloverAlias;
         }
 
-        RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null).masterNodeTimeout(masterTimeout);
+        RolloverRequest rolloverRequest = new RolloverRequest(rolloverTarget, null).masterNodeTimeout(masterTimeout);
         rolloverRequest.dryRun(true);
         if (maxAge != null) {
             rolloverRequest.addMaxIndexAgeCondition(maxAge);

+ 6 - 4
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AbstractStepMasterTimeoutTestCase.java

@@ -42,17 +42,19 @@ public abstract class AbstractStepMasterTimeoutTestCase<T extends AsyncActionSte
     }
 
     public void testMasterTimeout() {
+        IndexMetadata indexMetadata = getIndexMetadata();
         checkMasterTimeout(TimeValue.timeValueSeconds(30),
-            ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder().build()).build());
+            ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder().put(indexMetadata, true).build()).build(), indexMetadata);
         checkMasterTimeout(TimeValue.timeValueSeconds(10),
             ClusterState.builder(ClusterName.DEFAULT)
                 .metadata(Metadata.builder()
                     .persistentSettings(Settings.builder().put(LIFECYCLE_STEP_MASTER_TIMEOUT, "10s").build())
+                    .put(indexMetadata, true)
                     .build())
-                .build());
+                .build(), indexMetadata);
     }
 
-    private void checkMasterTimeout(TimeValue timeValue, ClusterState currentClusterState) {
+    private void checkMasterTimeout(TimeValue timeValue, ClusterState currentClusterState, IndexMetadata indexMetadata) {
         AtomicBoolean timeoutChecked = new AtomicBoolean();
         client = new NoOpClient(pool) {
             @Override
@@ -65,7 +67,7 @@ public abstract class AbstractStepMasterTimeoutTestCase<T extends AsyncActionSte
                 }
             }
         };
-        createRandomInstance().performAction(getIndexMetadata(), currentClusterState, null, new AsyncActionStep.Listener() {
+        createRandomInstance().performAction(indexMetadata, currentClusterState, null, new AsyncActionStep.Listener() {
             @Override
             public void onResponse(boolean complete) {
 

+ 102 - 20
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java

@@ -12,13 +12,18 @@ import org.elasticsearch.action.admin.indices.rollover.MaxSizeCondition;
 import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
 import org.mockito.Mockito;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Locale;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -71,11 +76,11 @@ public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<Rollove
         return getIndexMetadata(randomAlphaOfLength(5));
     }
 
-    private static void assertRolloverIndexRequest(RolloverRequest request, String alias) {
+    private static void assertRolloverIndexRequest(RolloverRequest request, String rolloverTarget) {
         assertNotNull(request);
         assertEquals(1, request.indices().length);
-        assertEquals(alias, request.indices()[0]);
-        assertEquals(alias, request.getRolloverTarget());
+        assertEquals(rolloverTarget, request.indices()[0]);
+        assertEquals(rolloverTarget, request.getRolloverTarget());
         assertFalse(request.isDryRun());
         assertEquals(0, request.getConditions().size());
     }
@@ -86,17 +91,16 @@ public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<Rollove
 
         RolloverStep step = createRandomInstance();
 
-        Mockito.doAnswer(invocation -> {
-            RolloverRequest request = (RolloverRequest) invocation.getArguments()[0];
-            @SuppressWarnings("unchecked")
-            ActionListener<RolloverResponse> listener = (ActionListener<RolloverResponse>) invocation.getArguments()[1];
-            assertRolloverIndexRequest(request, alias);
-            listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), true, true, true));
-            return null;
-        }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
+        mockClientRolloverCall(alias);
 
         SetOnce<Boolean> actionCompleted = new SetOnce<>();
-        step.performAction(indexMetadata, emptyClusterState(), null, new AsyncActionStep.Listener() {
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(
+                Metadata.builder()
+                    .put(indexMetadata, true)
+            )
+            .build();
+        step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
 
             @Override
             public void onResponse(boolean complete) {
@@ -116,6 +120,55 @@ public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<Rollove
         Mockito.verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any());
     }
 
+    public void testPerformActionOnDataStream() {
+        String dataStreamName = "test-datastream";
+        IndexMetadata indexMetadata = IndexMetadata.builder(dataStreamName + "-000001")
+            .settings(settings(Version.CURRENT))
+            .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
+
+        RolloverStep step = createRandomInstance();
+
+        mockClientRolloverCall(dataStreamName);
+
+        SetOnce<Boolean> actionCompleted = new SetOnce<>();
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(
+                Metadata.builder()
+                    .put(new DataStream(dataStreamName, "timestamp", List.of(indexMetadata.getIndex()), 1L))
+                    .put(indexMetadata, true)
+            )
+            .build();
+        step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
+
+            @Override
+            public void onResponse(boolean complete) {
+                actionCompleted.set(complete);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                throw new AssertionError("Unexpected method call", e);
+            }
+        });
+
+        assertEquals(true, actionCompleted.get());
+
+        Mockito.verify(client, Mockito.only()).admin();
+        Mockito.verify(adminClient, Mockito.only()).indices();
+        Mockito.verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any());
+    }
+
+    private void mockClientRolloverCall(String rolloverTarget) {
+        Mockito.doAnswer(invocation -> {
+            RolloverRequest request = (RolloverRequest) invocation.getArguments()[0];
+            @SuppressWarnings("unchecked")
+            ActionListener<RolloverResponse> listener = (ActionListener<RolloverResponse>) invocation.getArguments()[1];
+            assertRolloverIndexRequest(request, rolloverTarget);
+            listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), true, true, true));
+            return null;
+        }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
+    }
+
     public void testPerformActionWithIndexingComplete() {
         String alias = randomAlphaOfLength(5);
         IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLength(10))
@@ -128,7 +181,13 @@ public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<Rollove
         RolloverStep step = createRandomInstance();
 
         SetOnce<Boolean> actionCompleted = new SetOnce<>();
-        step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() {
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(
+                Metadata.builder()
+                    .put(indexMetadata, true)
+            )
+            .build();
+        step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
 
             @Override
             public void onResponse(boolean complete) {
@@ -156,8 +215,13 @@ public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<Rollove
             .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
 
         RolloverStep step = createRandomInstance();
-
-        step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() {
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(
+                Metadata.builder()
+                    .put(indexMetadata, true)
+            )
+            .build();
+        step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
 
             @Override
             public void onResponse(boolean complete) {
@@ -189,7 +253,13 @@ public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<Rollove
         }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
 
         SetOnce<Boolean> exceptionThrown = new SetOnce<>();
-        step.performAction(indexMetadata, emptyClusterState(), null, new AsyncActionStep.Listener() {
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(
+                Metadata.builder()
+                    .put(indexMetadata, true)
+            )
+            .build();
+        step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
 
             @Override
             public void onResponse(boolean complete) {
@@ -218,7 +288,13 @@ public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<Rollove
         RolloverStep step = createRandomInstance();
 
         SetOnce<Exception> exceptionThrown = new SetOnce<>();
-        step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() {
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(
+                Metadata.builder()
+                    .put(indexMetadata, true)
+            )
+            .build();
+        step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
             @Override
             public void onResponse(boolean complete) {
                 throw new AssertionError("Unexpected method call");
@@ -231,8 +307,8 @@ public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<Rollove
         });
         assertThat(exceptionThrown.get().getClass(), equalTo(IllegalArgumentException.class));
         assertThat(exceptionThrown.get().getMessage(), equalTo(String.format(Locale.ROOT,
-            "setting [%s] for index [%s] is empty or not defined", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS,
-            indexMetadata.getIndex().getName())));
+            "setting [%s] for index [%s] is empty or not defined, it must be set to the name of the alias pointing to the group of " +
+                "indices being rolled over", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, indexMetadata.getIndex().getName())));
     }
 
     public void testPerformActionAliasDoesNotPointToIndex() {
@@ -243,7 +319,13 @@ public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<Rollove
         RolloverStep step = createRandomInstance();
 
         SetOnce<Exception> exceptionThrown = new SetOnce<>();
-        step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() {
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(
+                Metadata.builder()
+                    .put(indexMetadata, true)
+            )
+            .build();
+        step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
             @Override
             public void onResponse(boolean complete) {
                 throw new AssertionError("Unexpected method call");

+ 9 - 4
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SegmentCountStepTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
 import org.elasticsearch.action.admin.indices.segments.ShardSegments;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.index.Index;
@@ -113,7 +114,8 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
         SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();
 
         SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
-        step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
+        IndexMetadata indexMetadata = makeMeta(index);
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
             @Override
             public void onResponse(boolean conditionMet, ToXContentObject info) {
                 conditionMetResult.set(conditionMet);
@@ -165,7 +167,8 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
         SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();
 
         SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
-        step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
+        IndexMetadata indexMetadata = makeMeta(index);
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
             @Override
             public void onResponse(boolean conditionMet, ToXContentObject info) {
                 conditionMetResult.set(conditionMet);
@@ -220,7 +223,8 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
         SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();
 
         SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
-        step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
+        IndexMetadata indexMetadata = makeMeta(index);
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
             @Override
             public void onResponse(boolean conditionMet, ToXContentObject info) {
                 conditionMetResult.set(conditionMet);
@@ -256,7 +260,8 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
         SetOnce<Boolean> exceptionThrown = new SetOnce<>();
 
         SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
-        step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
+        IndexMetadata indexMetadata = makeMeta(index);
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
             @Override
             public void onResponse(boolean conditionMet, ToXContentObject info) {
                 throw new AssertionError("unexpected method call");

+ 33 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStepTests.java

@@ -11,11 +11,13 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.function.LongSupplier;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -78,6 +80,35 @@ public class UpdateRolloverLifecycleDateStepTests extends AbstractStepTestCase<U
         assertThat(actualRolloverTime, equalTo(rolloverTime));
     }
 
+    public void testPerformActionOnDataStream() {
+        long creationDate = randomLongBetween(0, 1000000);
+        long rolloverTime = randomValueOtherThan(creationDate, () -> randomNonNegativeLong());
+        String dataStreamName = "test-datastream";
+        IndexMetadata originalIndexMeta = IndexMetadata.builder(dataStreamName + "-000001")
+            .putRolloverInfo(new RolloverInfo(dataStreamName, Collections.emptyList(), rolloverTime))
+            .settings(settings(Version.CURRENT))
+            .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
+
+        IndexMetadata rolledIndexMeta= IndexMetadata.builder(dataStreamName + "-000002")
+            .settings(settings(Version.CURRENT))
+            .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
+
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(
+                Metadata.builder()
+                    .put(new DataStream(dataStreamName, "timestamp", List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex()), 2L))
+                    .put(originalIndexMeta, true)
+                    .put(rolledIndexMeta, true)
+            ).build();
+
+        UpdateRolloverLifecycleDateStep step = createRandomInstance();
+        ClusterState newState = step.performAction(originalIndexMeta.getIndex(), clusterState);
+        long actualRolloverTime = LifecycleExecutionState
+            .fromIndexMetadata(newState.metadata().index(originalIndexMeta.getIndex()))
+            .getLifecycleDate();
+        assertThat(actualRolloverTime, equalTo(rolloverTime));
+    }
+
     public void testPerformActionBeforeRolloverHappened() {
         String alias = randomAlphaOfLength(3);
         long creationDate = randomLongBetween(0, 1000000);
@@ -92,8 +123,8 @@ public class UpdateRolloverLifecycleDateStepTests extends AbstractStepTestCase<U
         IllegalStateException exceptionThrown = expectThrows(IllegalStateException.class,
             () -> step.performAction(indexMetadata.getIndex(), clusterState));
         assertThat(exceptionThrown.getMessage(),
-            equalTo("no rollover info found for [" + indexMetadata.getIndex().getName() + "] with alias [" + alias + "], the index " +
-                "has not yet rolled over with that alias"));
+            equalTo("no rollover info found for [" + indexMetadata.getIndex().getName() + "] with rollover target [" + alias + "], the " +
+                "index has not yet rolled over with that target"));
     }
 
     public void testPerformActionWithNoRolloverAliasSetting() {

+ 40 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForActiveShardsTests.java

@@ -9,6 +9,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -23,6 +24,7 @@ import org.elasticsearch.index.Index;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.UUID;
 
 import static org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep.parseIndexNameCounter;
@@ -147,6 +149,44 @@ public class WaitForActiveShardsTests extends AbstractStepTestCase<WaitForActive
             " met", createRandomInstance().isConditionMet(originalIndex.getIndex(), clusterState).isComplete(), is(true));
     }
 
+    public void testResultEvaluatedOnDataStream() throws IOException {
+        String dataStreamName = "test-datastream";
+        IndexMetadata originalIndexMeta = IndexMetadata.builder(dataStreamName + "-000001")
+            .settings(settings(Version.CURRENT))
+            .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
+
+        IndexMetadata rolledIndexMeta= IndexMetadata.builder(dataStreamName + "-000002")
+            .settings(settings(Version.CURRENT).put("index.write.wait_for_active_shards", "3"))
+            .numberOfShards(1).numberOfReplicas(3).build();
+
+        IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(rolledIndexMeta.getIndex());
+        routingTable.addShard(TestShardRouting.newShardRouting(rolledIndexMeta.getIndex().getName(), 0, "node", null, true,
+            ShardRoutingState.STARTED));
+        routingTable.addShard(TestShardRouting.newShardRouting(rolledIndexMeta.getIndex().getName(), 0, "node2", null, false,
+            ShardRoutingState.STARTED));
+
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(
+                Metadata.builder()
+                    .put(new DataStream(dataStreamName, "timestamp", List.of(originalIndexMeta.getIndex(), rolledIndexMeta.getIndex()), 2L))
+                    .put(originalIndexMeta, true)
+                    .put(rolledIndexMeta, true)
+            )
+            .routingTable(RoutingTable.builder().add(routingTable.build()).build())
+            .build();
+
+        WaitForActiveShardsStep waitForActiveShardsStep = createRandomInstance();
+
+        ClusterStateWaitStep.Result result = waitForActiveShardsStep.isConditionMet(originalIndexMeta.getIndex(), clusterState);
+        assertThat(result.isComplete(), is(false));
+
+        XContentBuilder expected = new WaitForActiveShardsStep.ActiveShardsInfo(2, "3", false).toXContent(JsonXContent.contentBuilder(),
+            ToXContent.EMPTY_PARAMS);
+        String actualResultAsString = Strings.toString(result.getInfomationContext());
+        assertThat(actualResultAsString, is(Strings.toString(expected)));
+        assertThat(actualResultAsString, containsString("waiting for [3] shards to become active, but only [2] are active"));
+    }
+
     public void testResultReportsMeaningfulMessage() throws IOException {
         String alias = randomAlphaOfLength(5);
         IndexMetadata originalIndex = IndexMetadata.builder("index-000000")

+ 40 - 36
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForFollowShardTasksStepTests.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.ilm;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
 import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
@@ -68,18 +69,19 @@ public class WaitForFollowShardTasksStepTests extends AbstractStepTestCase<WaitF
         final boolean[] conditionMetHolder = new boolean[1];
         final ToXContentObject[] informationContextHolder = new ToXContentObject[1];
         final Exception[] exceptionHolder = new Exception[1];
-        createRandomInstance().evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
-            @Override
-            public void onResponse(boolean conditionMet, ToXContentObject informationContext) {
-                conditionMetHolder[0] = conditionMet;
-                informationContextHolder[0] = informationContext;
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                exceptionHolder[0] = e;
-            }
-        }, MASTER_TIMEOUT);
+        createRandomInstance().evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(),
+            new AsyncWaitStep.Listener() {
+                @Override
+                public void onResponse(boolean conditionMet, ToXContentObject informationContext) {
+                    conditionMetHolder[0] = conditionMet;
+                    informationContextHolder[0] = informationContext;
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    exceptionHolder[0] = e;
+                }
+            }, MASTER_TIMEOUT);
 
         assertThat(conditionMetHolder[0], is(true));
         assertThat(informationContextHolder[0], nullValue());
@@ -102,18 +104,19 @@ public class WaitForFollowShardTasksStepTests extends AbstractStepTestCase<WaitF
         final boolean[] conditionMetHolder = new boolean[1];
         final ToXContentObject[] informationContextHolder = new ToXContentObject[1];
         final Exception[] exceptionHolder = new Exception[1];
-        createRandomInstance().evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
-            @Override
-            public void onResponse(boolean conditionMet, ToXContentObject informationContext) {
-                conditionMetHolder[0] = conditionMet;
-                informationContextHolder[0] = informationContext;
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                exceptionHolder[0] = e;
-            }
-        }, MASTER_TIMEOUT);
+        createRandomInstance().evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(),
+            new AsyncWaitStep.Listener() {
+                @Override
+                public void onResponse(boolean conditionMet, ToXContentObject informationContext) {
+                    conditionMetHolder[0] = conditionMet;
+                    informationContextHolder[0] = informationContext;
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    exceptionHolder[0] = e;
+                }
+            }, MASTER_TIMEOUT);
 
         assertThat(conditionMetHolder[0], is(false));
         assertThat(informationContextHolder[0], notNullValue());
@@ -135,18 +138,19 @@ public class WaitForFollowShardTasksStepTests extends AbstractStepTestCase<WaitF
         final boolean[] conditionMetHolder = new boolean[1];
         final ToXContentObject[] informationContextHolder = new ToXContentObject[1];
         final Exception[] exceptionHolder = new Exception[1];
-        createRandomInstance().evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
-            @Override
-            public void onResponse(boolean conditionMet, ToXContentObject informationContext) {
-                conditionMetHolder[0] = conditionMet;
-                informationContextHolder[0] = informationContext;
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                exceptionHolder[0] = e;
-            }
-        }, MASTER_TIMEOUT);
+        createRandomInstance().evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(),
+            new AsyncWaitStep.Listener() {
+                @Override
+                public void onResponse(boolean conditionMet, ToXContentObject informationContext) {
+                    conditionMetHolder[0] = conditionMet;
+                    informationContextHolder[0] = informationContext;
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    exceptionHolder[0] = e;
+                }
+            }, MASTER_TIMEOUT);
 
         assertThat(conditionMetHolder[0], is(true));
         assertThat(informationContextHolder[0], nullValue());

+ 5 - 4
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForNoFollowersStepTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.index.seqno.RetentionLease;
@@ -78,7 +79,7 @@ public class WaitForNoFollowersStepTests extends AbstractStepTestCase<WaitForNoF
 
         final SetOnce<Boolean> conditionMetHolder = new SetOnce<>();
         final SetOnce<ToXContentObject> stepInfoHolder = new SetOnce<>();
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
             @Override
             public void onResponse(boolean conditionMet, ToXContentObject infomationContext) {
                 conditionMetHolder.set(conditionMet);
@@ -111,7 +112,7 @@ public class WaitForNoFollowersStepTests extends AbstractStepTestCase<WaitForNoF
 
         final SetOnce<Boolean> conditionMetHolder = new SetOnce<>();
         final SetOnce<ToXContentObject> stepInfoHolder = new SetOnce<>();
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
             @Override
             public void onResponse(boolean conditionMet, ToXContentObject infomationContext) {
                 conditionMetHolder.set(conditionMet);
@@ -148,7 +149,7 @@ public class WaitForNoFollowersStepTests extends AbstractStepTestCase<WaitForNoF
 
         final SetOnce<Boolean> conditionMetHolder = new SetOnce<>();
         final SetOnce<ToXContentObject> stepInfoHolder = new SetOnce<>();
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
             @Override
             public void onResponse(boolean conditionMet, ToXContentObject infomationContext) {
                 conditionMetHolder.set(conditionMet);
@@ -187,7 +188,7 @@ public class WaitForNoFollowersStepTests extends AbstractStepTestCase<WaitForNoF
         }).when(indicesClient).stats(any(), any());
 
         final SetOnce<Exception> exceptionHolder = new SetOnce<>();
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
             @Override
             public void onResponse(boolean conditionMet, ToXContentObject infomationContext) {
                 fail("onResponse should not be called in this test, called with conditionMet: " + conditionMet

+ 72 - 32
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java

@@ -17,7 +17,9 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
@@ -26,6 +28,7 @@ import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
@@ -90,11 +93,11 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
             instance.getMaxSize(), instance.getMaxAge(), instance.getMaxDocs());
     }
 
-    private static void assertRolloverIndexRequest(RolloverRequest request, String alias, Set<Condition<?>> expectedConditions) {
+    private static void assertRolloverIndexRequest(RolloverRequest request, String rolloverTarget, Set<Condition<?>> expectedConditions) {
         assertNotNull(request);
         assertEquals(1, request.indices().length);
-        assertEquals(alias, request.indices()[0]);
-        assertEquals(alias, request.getRolloverTarget());
+        assertEquals(rolloverTarget, request.indices()[0]);
+        assertEquals(rolloverTarget, request.getRolloverTarget());
         assertEquals(expectedConditions.size(), request.getConditions().size());
         assertTrue(request.isDryRun());
         Set<Object> expectedConditionValues = expectedConditions.stream().map(Condition::value).collect(Collectors.toSet());
@@ -103,7 +106,6 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
         assertEquals(expectedConditionValues, actualConditionValues);
     }
 
-
     public void testEvaluateCondition() {
         String alias = randomAlphaOfLength(5);
         IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLength(10))
@@ -113,29 +115,44 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
 
         WaitForRolloverReadyStep step = createRandomInstance();
 
-        Mockito.doAnswer(invocation -> {
-            RolloverRequest request = (RolloverRequest) invocation.getArguments()[0];
-            @SuppressWarnings("unchecked")
-            ActionListener<RolloverResponse> listener = (ActionListener<RolloverResponse>) invocation.getArguments()[1];
-            Set<Condition<?>> expectedConditions = new HashSet<>();
-            if (step.getMaxAge() != null) {
-                expectedConditions.add(new MaxAgeCondition(step.getMaxAge()));
-            }
-            if (step.getMaxSize() != null) {
-                expectedConditions.add(new MaxSizeCondition(step.getMaxSize()));
+        mockRolloverIndexCall(alias, step);
+
+        SetOnce<Boolean> conditionsMet = new SetOnce<>();
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
+
+            @Override
+            public void onResponse(boolean complete, ToXContentObject infomationContext) {
+                conditionsMet.set(complete);
             }
-            if (step.getMaxDocs() != null) {
-                expectedConditions.add(new MaxDocsCondition(step.getMaxDocs()));
+
+            @Override
+            public void onFailure(Exception e) {
+                throw new AssertionError("Unexpected method call", e);
             }
-            assertRolloverIndexRequest(request, alias, expectedConditions);
-            Map<String, Boolean> conditionResults = expectedConditions.stream()
-                .collect(Collectors.toMap(Condition::toString, condition -> true));
-            listener.onResponse(new RolloverResponse(null, null, conditionResults, request.isDryRun(), false, false, false));
-            return null;
-        }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
+        }, MASTER_TIMEOUT);
+
+        assertEquals(true, conditionsMet.get());
+
+        verify(client, Mockito.only()).admin();
+        verify(adminClient, Mockito.only()).indices();
+        verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any());
+    }
+
+    public void testEvaluateConditionOnDataStreamTarget() {
+        String dataStreamName = "test-datastream";
+        IndexMetadata indexMetadata = IndexMetadata.builder(dataStreamName + "-000001")
+            .settings(settings(Version.CURRENT))
+            .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
+
+        WaitForRolloverReadyStep step = createRandomInstance();
+
+        mockRolloverIndexCall(dataStreamName, step);
 
         SetOnce<Boolean> conditionsMet = new SetOnce<>();
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        Metadata metadata = Metadata.builder().put(indexMetadata, true)
+            .put(new DataStream(dataStreamName, "timestamp", List.of(indexMetadata.getIndex()), 1L))
+            .build();
+        step.evaluateCondition(metadata, indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
 
             @Override
             public void onResponse(boolean complete, ToXContentObject infomationContext) {
@@ -155,6 +172,29 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
         verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any());
     }
 
+    private void mockRolloverIndexCall(String rolloverTarget, WaitForRolloverReadyStep step) {
+        Mockito.doAnswer(invocation -> {
+            RolloverRequest request = (RolloverRequest) invocation.getArguments()[0];
+            @SuppressWarnings("unchecked")
+            ActionListener<RolloverResponse> listener = (ActionListener<RolloverResponse>) invocation.getArguments()[1];
+            Set<Condition<?>> expectedConditions = new HashSet<>();
+            if (step.getMaxAge() != null) {
+                expectedConditions.add(new MaxAgeCondition(step.getMaxAge()));
+            }
+            if (step.getMaxSize() != null) {
+                expectedConditions.add(new MaxSizeCondition(step.getMaxSize()));
+            }
+            if (step.getMaxDocs() != null) {
+                expectedConditions.add(new MaxDocsCondition(step.getMaxDocs()));
+            }
+            assertRolloverIndexRequest(request, rolloverTarget, expectedConditions);
+            Map<String, Boolean> conditionResults = expectedConditions.stream()
+                .collect(Collectors.toMap(Condition::toString, condition -> true));
+            listener.onResponse(new RolloverResponse(null, null, conditionResults, request.isDryRun(), false, false, false));
+            return null;
+        }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
+    }
+
     public void testEvaluateDoesntTriggerRolloverForIndexManuallyRolledOnLifecycleRolloverAlias() {
         String rolloverAlias = randomAlphaOfLength(5);
         IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLength(10))
@@ -166,7 +206,7 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
 
         WaitForRolloverReadyStep step = createRandomInstance();
 
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
 
             @Override
             public void onResponse(boolean complete, ToXContentObject informationContext) {
@@ -195,7 +235,7 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
 
         WaitForRolloverReadyStep step = createRandomInstance();
 
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
 
             @Override
             public void onResponse(boolean complete, ToXContentObject informationContext) {
@@ -220,7 +260,7 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
             .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
 
         WaitForRolloverReadyStep step = createRandomInstance();
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
 
             @Override
             public void onResponse(boolean complete, ToXContentObject infomationContext) {
@@ -249,7 +289,7 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
         WaitForRolloverReadyStep step = createRandomInstance();
 
         SetOnce<Boolean> conditionsMet = new SetOnce<>();
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
 
             @Override
             public void onResponse(boolean complete, ToXContentObject infomationContext) {
@@ -277,7 +317,7 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
         WaitForRolloverReadyStep step = createRandomInstance();
 
         SetOnce<Boolean> correctFailureCalled = new SetOnce<>();
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
 
             @Override
             public void onResponse(boolean complete, ToXContentObject infomationContext) {
@@ -324,7 +364,7 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
         }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
 
         SetOnce<Boolean> actionCompleted = new SetOnce<>();
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
 
             @Override
             public void onResponse(boolean complete, ToXContentObject infomationContext) {
@@ -373,7 +413,7 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
         }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
 
         SetOnce<Boolean> exceptionThrown = new SetOnce<>();
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
 
             @Override
             public void onResponse(boolean complete, ToXContentObject infomationContext) {
@@ -402,7 +442,7 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
         WaitForRolloverReadyStep step = createRandomInstance();
 
         SetOnce<Exception> exceptionThrown = new SetOnce<>();
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
             @Override
             public void onResponse(boolean complete, ToXContentObject infomationContext) {
                 throw new AssertionError("Unexpected method call");
@@ -427,7 +467,7 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
         WaitForRolloverReadyStep step = createRandomInstance();
 
         SetOnce<Exception> exceptionThrown = new SetOnce<>();
-        step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+        step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
             @Override
             public void onResponse(boolean complete, ToXContentObject infomationContext) {
                 throw new AssertionError("Unexpected method call");

+ 127 - 0
x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java

@@ -0,0 +1,127 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack;
+
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.xpack.core.ilm.LifecycleAction;
+import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
+import org.elasticsearch.xpack.core.ilm.Phase;
+import org.elasticsearch.xpack.core.ilm.Step;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Locale;
+import java.util.Map;
+
+import static java.util.Collections.singletonMap;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+/**
+ * This class provides the operational REST functions needed to control an ILM time series lifecycle.
+ */
+public final class TimeSeriesRestDriver {
+
+    private static final Logger logger = LogManager.getLogger(TimeSeriesRestDriver.class);
+
+    private TimeSeriesRestDriver() {
+    }
+
+    public static Step.StepKey getStepKeyForIndex(RestClient client, String indexName) throws IOException {
+        Map<String, Object> indexResponse = explainIndex(client, indexName);
+        if (indexResponse == null) {
+            return new Step.StepKey(null, null, null);
+        }
+
+        return getStepKey(indexResponse);
+    }
+
+    private static Step.StepKey getStepKey(Map<String, Object> explainIndexResponse) {
+        String phase = (String) explainIndexResponse.get("phase");
+        String action = (String) explainIndexResponse.get("action");
+        String step = (String) explainIndexResponse.get("step");
+        return new Step.StepKey(phase, action, step);
+    }
+
+    public static Map<String, Object> explainIndex(RestClient client, String indexName) throws IOException {
+        return explain(client, indexName, false, false).get(indexName);
+    }
+
+    public static Map<String, Map<String, Object>> explain(RestClient client, String indexPattern, boolean onlyErrors,
+                                                           boolean onlyManaged) throws IOException {
+        Request explainRequest = new Request("GET", indexPattern + "/_ilm/explain");
+        explainRequest.addParameter("only_errors", Boolean.toString(onlyErrors));
+        explainRequest.addParameter("only_managed", Boolean.toString(onlyManaged));
+        Response response = client.performRequest(explainRequest);
+        Map<String, Object> responseMap;
+        try (InputStream is = response.getEntity().getContent()) {
+            responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
+        }
+
+        @SuppressWarnings("unchecked") Map<String, Map<String, Object>> indexResponse =
+            ((Map<String, Map<String, Object>>) responseMap.get("indices"));
+        return indexResponse;
+    }
+
+    public static void indexDocument(RestClient client, String indexAbstractionName) throws IOException {
+        indexDocument(client, indexAbstractionName, false);
+    }
+
+    public static void indexDocument(RestClient client, String indexAbstractionName, boolean refresh) throws IOException {
+        Request indexRequest = new Request("POST", indexAbstractionName + "/_doc" + (refresh ? "?refresh" : ""));
+        indexRequest.setEntity(new StringEntity("{\"a\": \"test\"}", ContentType.APPLICATION_JSON));
+        Response response = client.performRequest(indexRequest);
+        logger.info(response.getStatusLine());
+    }
+
+    public static void createNewSingletonPolicy(RestClient client, String policyName, String phaseName, LifecycleAction action)
+        throws IOException {
+        createNewSingletonPolicy(client, policyName, phaseName, action, TimeValue.ZERO);
+    }
+
+    public static void createNewSingletonPolicy(RestClient client, String policyName, String phaseName, LifecycleAction action,
+                                                TimeValue after) throws IOException {
+        Phase phase = new Phase(phaseName, after, singletonMap(action.getWriteableName(), action));
+        LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policyName, singletonMap(phase.getName(), phase));
+        XContentBuilder builder = jsonBuilder();
+        lifecyclePolicy.toXContent(builder, null);
+        final StringEntity entity = new StringEntity(
+            "{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON);
+        Request request = new Request("PUT", "_ilm/policy/" + policyName);
+        request.setEntity(entity);
+        client.performRequest(request);
+    }
+
+    public static void createComposableTemplate(RestClient client, String templateName, String indexPattern, Template template)
+        throws IOException {
+        XContentBuilder builder = jsonBuilder();
+        template.toXContent(builder, ToXContent.EMPTY_PARAMS);
+        StringEntity templateJSON = new StringEntity(
+            String.format(Locale.ROOT, "{\n" +
+                "  \"index_patterns\": \"%s\",\n" +
+                "  \"data_stream\": { \"timestamp_field\": \"@timestamp\" },\n" +
+                "  \"template\": %s\n" +
+                "}", indexPattern, Strings.toString(builder)),
+            ContentType.APPLICATION_JSON);
+        Request createIndexTemplateRequest = new Request("PUT", "_index_template/" + templateName);
+        createIndexTemplateRequest.setEntity(templateJSON);
+        client.performRequest(createIndexTemplateRequest);
+    }
+
+}

+ 2 - 12
x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/ChangePolicyforIndexIT.java

@@ -116,18 +116,8 @@ public class ChangePolicyforIndexIT extends ESRestTestCase {
         assertBusy(() -> assertStep(indexName, PhaseCompleteStep.finalStep("warm").getKey()), 30, TimeUnit.SECONDS);
 
         // Check index is allocated on integTest-1 and integTest-2 as per policy_2
-        Request getSettingsRequest = new Request("GET", "/" + indexName + "/_settings");
-        Response getSettingsResponse = client().performRequest(getSettingsRequest);
-        assertOK(getSettingsResponse);
-        Map<String, Object> getSettingsResponseMap = entityAsMap(getSettingsResponse);
-        @SuppressWarnings("unchecked")
-        Map<String, Object> indexSettings = (Map<String, Object>) ((Map<String, Object>) getSettingsResponseMap.get(indexName))
-                .get("settings");
-        @SuppressWarnings("unchecked")
-        Map<String, Object> routingSettings = (Map<String, Object>) ((Map<String, Object>) indexSettings.get("index")).get("routing");
-        @SuppressWarnings("unchecked")
-        String includesAllocation = (String) ((Map<String, Object>) ((Map<String, Object>) routingSettings.get("allocation"))
-                .get("include")).get("_name");
+        Map<String, Object> indexSettings = getIndexSettingsAsMap(indexName);
+        String includesAllocation = (String) indexSettings.get("index.routing.allocation.include._name");
         assertEquals("integTest-1,integTest-2", includesAllocation);
     }
 

+ 40 - 0
x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java

@@ -0,0 +1,40 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ilm;
+
+import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
+import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
+import org.elasticsearch.xpack.core.ilm.RolloverAction;
+
+import static org.elasticsearch.xpack.TimeSeriesRestDriver.createComposableTemplate;
+import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
+import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
+import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
+import static org.hamcrest.Matchers.equalTo;
+
+public class TimeSeriesDataStreamsIT extends ESRestTestCase {
+
+    public void testRolloverAction() throws Exception {
+        String policyName = "logs-policy";
+        createNewSingletonPolicy(client(), policyName, "hot", new RolloverAction(null, null, 1L));
+
+        Settings lifecycleNameSetting = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyName).build();
+        Template template = new Template(lifecycleNameSetting, null, null);
+        createComposableTemplate(client(), "logs-template", "logs-foo*", template);
+
+        String dataStream = "logs-foo";
+        indexDocument(client(), dataStream, true);
+
+        assertBusy(() -> assertTrue(indexExists("logs-foo-000002")));
+        assertBusy(() -> assertTrue(Boolean.parseBoolean((String) getIndexSettingsAsMap("logs-foo-000002").get("index.hidden"))));
+        assertBusy(() -> assertThat(getStepKeyForIndex(client(), "logs-foo-000001"), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
+    }
+
+}

File diff suppressed because it is too large
+ 130 - 179
x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java


+ 2 - 2
x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleRestIT.java

@@ -51,7 +51,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.CREATE_OPERATION;
 import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION;
 import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore.SLM_HISTORY_INDEX_PREFIX;
-import static org.elasticsearch.xpack.ilm.TimeSeriesLifecycleActionsIT.getStepKeyForIndex;
+import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -624,7 +624,7 @@ public class SnapshotLifecycleRestIT extends ESRestTestCase {
     }
 
     private void assertHistoryIndexWaitingForRollover() throws IOException {
-        Step.StepKey stepKey = getStepKeyForIndex(SLM_HISTORY_INDEX_PREFIX + "000001");
+        Step.StepKey stepKey = getStepKeyForIndex(client(), SLM_HISTORY_INDEX_PREFIX + "000001");
         assertEquals("hot", stepKey.getPhase());
         assertEquals(RolloverAction.NAME, stepKey.getAction());
         assertEquals(WaitForRolloverReadyStep.NAME, stepKey.getName());

+ 3 - 2
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java

@@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateObserver;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.unit.TimeValue;
@@ -118,7 +119,7 @@ class IndexLifecycleRunner {
      * Run the current step, only if it is an asynchronous wait step. These
      * wait criteria are checked periodically from the ILM scheduler
      */
-    void runPeriodicStep(String policy, IndexMetadata indexMetadata) {
+    void runPeriodicStep(String policy, Metadata metadata, IndexMetadata indexMetadata) {
         String index = indexMetadata.getIndex().getName();
         LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetadata);
         final Step currentStep;
@@ -170,7 +171,7 @@ class IndexLifecycleRunner {
             }
         } else if (currentStep instanceof AsyncWaitStep) {
             logger.debug("[{}] running periodic policy with current-step [{}]", index, currentStep.getKey());
-            ((AsyncWaitStep) currentStep).evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() {
+            ((AsyncWaitStep) currentStep).evaluateCondition(metadata, indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
 
                 @Override
                 public void onResponse(boolean conditionMet, ToXContentObject stepInfo) {

+ 2 - 2
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java

@@ -314,7 +314,7 @@ public class IndexLifecycleService
                             if (fromClusterStateChange) {
                                 lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta);
                             } else {
-                                lifecycleRunner.runPeriodicStep(policyName, idxMeta);
+                                lifecycleRunner.runPeriodicStep(policyName, clusterState.metadata(), idxMeta);
                             }
                             // ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
                             safeToStop = false;
@@ -326,7 +326,7 @@ public class IndexLifecycleService
                         if (fromClusterStateChange) {
                             lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta);
                         } else {
-                            lifecycleRunner.runPeriodicStep(policyName, idxMeta);
+                            lifecycleRunner.runPeriodicStep(policyName, clusterState.metadata(), idxMeta);
                         }
                     }
                 } catch (Exception e) {

+ 1 - 0
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/package-info.java

@@ -61,6 +61,7 @@
  *     <li>
  *        {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner#runPeriodicStep(
  *                      java.lang.String,
+ *                      org.elasticsearch.cluster.metadata.Metadata,
  *                      org.elasticsearch.cluster.metadata.IndexMetadata
  *                 )}
  *        handles the execution of async {@link org.elasticsearch.xpack.core.ilm.AsyncWaitStep}

+ 6 - 6
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java

@@ -142,7 +142,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
             .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
 
         runner.runPolicyAfterStateChange(policyName, indexMetadata);
-        runner.runPeriodicStep(policyName, indexMetadata);
+        runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata);
 
         Mockito.verifyZeroInteractions(clusterService);
     }
@@ -158,7 +158,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
             .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
 
         runner.runPolicyAfterStateChange(policyName, indexMetadata);
-        runner.runPeriodicStep(policyName, indexMetadata);
+        runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata);
 
         Mockito.verify(clusterService, times(2)).submitStateUpdateTask(any(), any());
 
@@ -231,7 +231,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
             .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5))
             .build();
 
-        runner.runPeriodicStep(policyName, indexMetadata);
+        runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata);
 
         Mockito.verify(clusterService, times(1)).submitStateUpdateTask(any(), any());
     }
@@ -414,7 +414,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
         if (asyncAction) {
             runner.maybeRunAsyncAction(before, indexMetadata, policyName, stepKey);
         } else if (periodicAction) {
-            runner.runPeriodicStep(policyName, indexMetadata);
+            runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata);
         } else {
             runner.runPolicyAfterStateChange(policyName, indexMetadata);
         }
@@ -598,7 +598,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
         ClusterState before = clusterService.state();
         CountDownLatch latch = new CountDownLatch(1);
         step.setLatch(latch);
-        runner.runPeriodicStep(policyName, indexMetadata);
+        runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata);
         awaitLatch(latch, 5, TimeUnit.SECONDS);
 
         ClusterState after = clusterService.state();
@@ -947,7 +947,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
         }
 
         @Override
-        public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
+        public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
             executeCount++;
             if (latch != null) {
                 latch.countDown();

Some files were not shown because too many files changed in this diff