浏览代码

[8.x] Update data stream deprecations warnings to new format and filter sea… (#119097)

* Update data stream deprecations warnings to new format and filter searchable snapshots from response (#118562)
* Update data stream deprecations warnings to new format

* Add reindex_required flag to index version deprecation notice response

* PR Changes

* Move all deprecation checks to use a shared predicate which also excludes snapshots

* Update docs/changelog/118562.yaml

* Tests for excluding snapshots

* PR Changes - Remove leftover comment

(cherry picked from commit 5487927)

* Update docs/changelog/119097.yaml
Luke Whiting 10 月之前
父节点
当前提交
f8adadaa75

+ 6 - 0
docs/changelog/119097.yaml

@@ -0,0 +1,6 @@
+pr: 119097
+summary: "[8.x] Update data stream deprecations warnings to new format and filter\
+  \ sea…"
+area: Data streams
+type: enhancement
+issues: []

+ 47 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/deprecation/DeprecatedIndexPredicate.java

@@ -0,0 +1,47 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.deprecation;
+
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexVersion;
+import org.elasticsearch.index.IndexVersions;
+
+import java.util.function.Predicate;
+
+public class DeprecatedIndexPredicate {
+
+    public static final IndexVersion MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE = IndexVersions.V_8_0_0;
+
+    /*
+     * This predicate allows through only indices that were created with a previous lucene version, meaning that they need to be reindexed
+     * in order to be writable in the _next_ lucene version.
+     *
+     * It ignores searchable snapshots as they are not writable.
+     */
+    public static Predicate<Index> getReindexRequiredPredicate(Metadata metadata) {
+        return index -> {
+            IndexMetadata indexMetadata = metadata.index(index);
+            return reindexRequired(indexMetadata);
+        };
+    }
+
+    public static boolean reindexRequired(IndexMetadata indexMetadata) {
+        return creationVersionBeforeMinimumWritableVersion(indexMetadata) && isNotSearchableSnapshot(indexMetadata);
+    }
+
+    private static boolean isNotSearchableSnapshot(IndexMetadata indexMetadata) {
+        return indexMetadata.isSearchableSnapshot() == false;
+    }
+
+    private static boolean creationVersionBeforeMinimumWritableVersion(IndexMetadata metadata) {
+        return metadata.getCreationVersion().before(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE);
+    }
+
+}

+ 16 - 40
x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecks.java

@@ -10,10 +10,12 @@ package org.elasticsearch.xpack.deprecation;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.index.Index;
-import org.elasticsearch.index.IndexVersions;
+import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
 import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
 
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static java.util.Map.entry;
 import static java.util.Map.ofEntries;
@@ -21,54 +23,28 @@ import static java.util.Map.ofEntries;
 public class DataStreamDeprecationChecks {
     static DeprecationIssue oldIndicesCheck(DataStream dataStream, ClusterState clusterState) {
         List<Index> backingIndices = dataStream.getIndices();
-        boolean hasOldIndices = backingIndices.stream()
-            .anyMatch(index -> clusterState.metadata().index(index).getCompatibilityVersion().before(IndexVersions.V_8_0_0));
-        if (hasOldIndices) {
-            long totalIndices = backingIndices.size();
-            List<Index> oldIndices = backingIndices.stream()
-                .filter(index -> clusterState.metadata().index(index).getCompatibilityVersion().before(IndexVersions.V_8_0_0))
-                .toList();
-            long totalOldIndices = oldIndices.size();
-            long totalOldSearchableSnapshots = oldIndices.stream()
-                .filter(index -> clusterState.metadata().index(index).isSearchableSnapshot())
-                .count();
-            long totalOldPartiallyMountedSearchableSnapshots = oldIndices.stream()
-                .filter(index -> clusterState.metadata().index(index).isPartialSearchableSnapshot())
-                .count();
-            long totalOldFullyMountedSearchableSnapshots = totalOldSearchableSnapshots - totalOldPartiallyMountedSearchableSnapshots;
+
+        Set<String> indicesNeedingUpgrade = backingIndices.stream()
+            .filter(DeprecatedIndexPredicate.getReindexRequiredPredicate(clusterState.metadata()))
+            .map(Index::getName)
+            .collect(Collectors.toUnmodifiableSet());
+
+        if (indicesNeedingUpgrade.isEmpty() == false) {
             return new DeprecationIssue(
                 DeprecationIssue.Level.CRITICAL,
                 "Old data stream with a compatibility version < 8.0",
-                "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html",
+                "https://www.elastic.co/guide/en/elasticsearch/reference/current/migrating-8.0.html#breaking-changes-8.0",
                 "This data stream has backing indices that were created before Elasticsearch 8.0.0",
                 false,
                 ofEntries(
-                    entry(
-                        "backing_indices",
-                        ofEntries(
-                            entry("count", totalIndices),
-                            entry(
-                                "need_upgrading",
-                                ofEntries(
-                                    entry("count", totalOldIndices),
-                                    entry(
-                                        "searchable_snapshots",
-                                        ofEntries(
-                                            entry("count", totalOldSearchableSnapshots),
-                                            entry("fully_mounted", ofEntries(entry("count", totalOldFullyMountedSearchableSnapshots))),
-                                            entry(
-                                                "partially_mounted",
-                                                ofEntries(entry("count", totalOldPartiallyMountedSearchableSnapshots))
-                                            )
-                                        )
-                                    )
-                                )
-                            )
-                        )
-                    )
+                    entry("reindex_required", true),
+                    entry("total_backing_indices", backingIndices.size()),
+                    entry("indices_requiring_upgrade_count", indicesNeedingUpgrade.size()),
+                    entry("indices_requiring_upgrade", indicesNeedingUpgrade)
                 )
             );
         }
+
         return null;
     }
 }

+ 5 - 3
x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecks.java

@@ -17,9 +17,11 @@ import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.IndexVersions;
 import org.elasticsearch.index.engine.frozen.FrozenEngine;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
+import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
 import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -36,14 +38,14 @@ public class IndexDeprecationChecks {
         // TODO: this check needs to be revised. It's trivially true right now.
         IndexVersion currentCompatibilityVersion = indexMetadata.getCompatibilityVersion();
         // We intentionally exclude indices that are in data streams because they will be picked up by DataStreamDeprecationChecks
-        if (currentCompatibilityVersion.before(IndexVersions.V_8_0_0) && isNotDataStreamIndex(indexMetadata, clusterState)) {
+        if (DeprecatedIndexPredicate.reindexRequired(indexMetadata) && isNotDataStreamIndex(indexMetadata, clusterState)) {
             return new DeprecationIssue(
                 DeprecationIssue.Level.CRITICAL,
                 "Old index with a compatibility version < 8.0",
-                "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html",
+                "https://www.elastic.co/guide/en/elasticsearch/reference/current/migrating-8.0.html#breaking-changes-8.0",
                 "This index has version: " + currentCompatibilityVersion.toReleaseVersion(),
                 false,
-                null
+                Collections.singletonMap("reindex_required", true)
             );
         }
         return null;

+ 34 - 40
x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationChecksTests.java

@@ -17,41 +17,46 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexVersion;
+import org.elasticsearch.snapshots.SearchableSnapshotsSettings;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static java.util.Collections.singletonList;
+import static java.util.Map.entry;
+import static java.util.Map.ofEntries;
+import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
 import static org.elasticsearch.xpack.deprecation.DeprecationChecks.DATA_STREAM_CHECKS;
 import static org.hamcrest.Matchers.equalTo;
 
 public class DataStreamDeprecationChecksTests extends ESTestCase {
 
     public void testOldIndicesCheck() {
-        long oldIndexCount = randomIntBetween(1, 100);
-        long newIndexCount = randomIntBetween(1, 100);
-        long oldSearchableSnapshotCount = 0;
-        long oldFullyManagedSearchableSnapshotCount = 0;
-        long oldPartiallyManagedSearchableSnapshotCount = 0;
+        int oldIndexCount = randomIntBetween(1, 100);
+        int newIndexCount = randomIntBetween(1, 100);
+
         List<Index> allIndices = new ArrayList<>();
         Map<String, IndexMetadata> nameToIndexMetadata = new HashMap<>();
+        Set<String> expectedIndices = new HashSet<>();
+
         for (int i = 0; i < oldIndexCount; i++) {
-            Settings.Builder settingsBuilder = settings(IndexVersion.fromId(7170099));
-            if (randomBoolean()) {
-                settingsBuilder.put("index.store.type", "snapshot");
-                if (randomBoolean()) {
-                    oldFullyManagedSearchableSnapshotCount++;
-                } else {
-                    settingsBuilder.put("index.store.snapshot.partial", true);
-                    oldPartiallyManagedSearchableSnapshotCount++;
-                }
-                oldSearchableSnapshotCount++;
+            Settings.Builder settings = settings(IndexVersion.fromId(7170099));
+
+            String indexName = "old-data-stream-index-" + i;
+            if (expectedIndices.isEmpty() == false && randomIntBetween(0, 2) == 0) {
+                settings.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE);
+            } else {
+                expectedIndices.add(indexName);
             }
-            IndexMetadata oldIndexMetadata = IndexMetadata.builder("old-data-stream-index-" + i)
+
+            Settings.Builder settingsBuilder = settings;
+            IndexMetadata oldIndexMetadata = IndexMetadata.builder(indexName)
                 .settings(settingsBuilder)
                 .numberOfShards(1)
                 .numberOfReplicas(0)
@@ -59,11 +64,9 @@ public class DataStreamDeprecationChecksTests extends ESTestCase {
             allIndices.add(oldIndexMetadata.getIndex());
             nameToIndexMetadata.put(oldIndexMetadata.getIndex().getName(), oldIndexMetadata);
         }
+
         for (int i = 0; i < newIndexCount; i++) {
             Settings.Builder settingsBuilder = settings(IndexVersion.current());
-            if (randomBoolean()) {
-                settingsBuilder.put("index.store.type", "snapshot");
-            }
             IndexMetadata newIndexMetadata = IndexMetadata.builder("new-data-stream-index-" + i)
                 .settings(settingsBuilder)
                 .numberOfShards(1)
@@ -72,6 +75,7 @@ public class DataStreamDeprecationChecksTests extends ESTestCase {
             allIndices.add(newIndexMetadata.getIndex());
             nameToIndexMetadata.put(newIndexMetadata.getIndex().getName(), newIndexMetadata);
         }
+
         DataStream dataStream = new DataStream(
             randomAlphaOfLength(10),
             allIndices,
@@ -88,37 +92,27 @@ public class DataStreamDeprecationChecksTests extends ESTestCase {
             randomBoolean(),
             null
         );
+
         Metadata metadata = Metadata.builder().indices(nameToIndexMetadata).build();
         ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build();
+
         DeprecationIssue expected = new DeprecationIssue(
             DeprecationIssue.Level.CRITICAL,
             "Old data stream with a compatibility version < 8.0",
-            "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html",
+            "https://www.elastic.co/guide/en/elasticsearch/reference/current/migrating-8.0.html#breaking-changes-8.0",
             "This data stream has backing indices that were created before Elasticsearch 8.0.0",
             false,
-            Map.of(
-                "backing_indices",
-                Map.of(
-                    "count",
-                    oldIndexCount + newIndexCount,
-                    "need_upgrading",
-                    Map.of(
-                        "count",
-                        oldIndexCount,
-                        "searchable_snapshots",
-                        Map.of(
-                            "count",
-                            oldSearchableSnapshotCount,
-                            "fully_mounted",
-                            Map.of("count", oldFullyManagedSearchableSnapshotCount),
-                            "partially_mounted",
-                            Map.of("count", oldPartiallyManagedSearchableSnapshotCount)
-                        )
-                    )
-                )
+            ofEntries(
+                entry("reindex_required", true),
+                entry("total_backing_indices", oldIndexCount + newIndexCount),
+                entry("indices_requiring_upgrade_count", expectedIndices.size()),
+                entry("indices_requiring_upgrade", expectedIndices)
             )
         );
+
         List<DeprecationIssue> issues = DeprecationChecks.filterChecks(DATA_STREAM_CHECKS, c -> c.apply(dataStream, clusterState));
+
         assertThat(issues, equalTo(singletonList(expected)));
     }
+
 }

+ 20 - 4
x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationChecksTests.java

@@ -19,8 +19,8 @@ import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexModule;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.IndexVersion;
-import org.elasticsearch.index.IndexVersions;
 import org.elasticsearch.index.engine.frozen.FrozenEngine;
+import org.elasticsearch.snapshots.SearchableSnapshotsSettings;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
 
@@ -29,6 +29,8 @@ import java.util.List;
 import java.util.Map;
 
 import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
 import static org.elasticsearch.xpack.deprecation.DeprecationChecks.INDEX_SETTINGS_CHECKS;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
@@ -49,10 +51,10 @@ public class IndexDeprecationChecksTests extends ESTestCase {
         DeprecationIssue expected = new DeprecationIssue(
             DeprecationIssue.Level.CRITICAL,
             "Old index with a compatibility version < 8.0",
-            "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-9.0.html",
+            "https://www.elastic.co/guide/en/elasticsearch/reference/current/migrating-8.0.html#breaking-changes-8.0",
             "This index has version: " + createdWith.toReleaseVersion(),
             false,
-            null
+            singletonMap("reindex_required", true)
         );
         List<DeprecationIssue> issues = DeprecationChecks.filterChecks(INDEX_SETTINGS_CHECKS, c -> c.apply(indexMetadata, clusterState));
         assertEquals(singletonList(expected), issues);
@@ -100,6 +102,20 @@ public class IndexDeprecationChecksTests extends ESTestCase {
         assertThat(issues.size(), equalTo(0));
     }
 
+    public void testOldIndicesCheckSnapshotIgnored() {
+        IndexVersion createdWith = IndexVersion.fromId(7170099);
+        Settings.Builder settings = settings(createdWith);
+        settings.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE);
+        IndexMetadata indexMetadata = IndexMetadata.builder("test").settings(settings).numberOfShards(1).numberOfReplicas(0).build();
+        ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
+            .metadata(Metadata.builder().put(indexMetadata, true))
+            .build();
+
+        List<DeprecationIssue> issues = DeprecationChecks.filterChecks(INDEX_SETTINGS_CHECKS, c -> c.apply(indexMetadata, clusterState));
+
+        assertThat(issues, empty());
+    }
+
     public void testTranslogRetentionSettings() {
         Settings.Builder settings = settings(IndexVersion.current());
         settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue());
@@ -229,7 +245,7 @@ public class IndexDeprecationChecksTests extends ESTestCase {
             + "} }";
 
         IndexMetadata simpleIndex = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10))
-            .settings(settings(IndexVersions.V_7_0_0))
+            .settings(settings(IndexVersion.current()))
             .numberOfShards(1)
             .numberOfReplicas(1)
             .putMapping(simpleMapping)

+ 0 - 18
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java

@@ -13,14 +13,10 @@ import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.FeatureFlag;
 import org.elasticsearch.features.NodeFeature;
-import org.elasticsearch.index.Index;
-import org.elasticsearch.index.IndexVersion;
-import org.elasticsearch.index.IndexVersions;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContent;
@@ -43,24 +39,10 @@ public class ReindexDataStreamAction extends ActionType<ReindexDataStreamAction.
     public static final ParseField SOURCE_FIELD = new ParseField("source");
     public static final ParseField INDEX_FIELD = new ParseField("index");
 
-    /*
-     * The version before which we do not support writes in the _next_ major version of Elasticsearch. For example, Elasticsearch 10.x will
-     * not support writing to indices created before version 9.0.0.
-     */
-    public static final IndexVersion MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE = IndexVersions.V_8_0_0;
-
     public ReindexDataStreamAction() {
         super(NAME);
     }
 
-    /*
-     * This predicate allows through only indices that were created with a previous lucene version, meaning that they need to be reindexed
-     * in order to be writable in the _next_ lucene version.
-     */
-    public static Predicate<Index> getOldIndexVersionPredicate(Metadata metadata) {
-        return index -> metadata.index(index).getCreationVersion().onOrBefore(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE);
-    }
-
     public enum Mode {
         UPGRADE
     }

+ 3 - 2
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

@@ -32,6 +32,7 @@ import org.elasticsearch.injection.guice.Inject;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
 
 import java.util.Locale;
 import java.util.Map;
@@ -78,13 +79,13 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
         IndexMetadata sourceIndex = clusterService.state().getMetadata().index(sourceIndexName);
         Settings settingsBefore = sourceIndex.getSettings();
 
-        var hasOldVersion = ReindexDataStreamAction.getOldIndexVersionPredicate(clusterService.state().metadata());
+        var hasOldVersion = DeprecatedIndexPredicate.getReindexRequiredPredicate(clusterService.state().metadata());
         if (hasOldVersion.test(sourceIndex.getIndex()) == false) {
             logger.warn(
                 "Migrating index [{}] with version [{}] is unnecessary as its version is not before [{}]",
                 sourceIndexName,
                 sourceIndex.getCreationVersion(),
-                ReindexDataStreamAction.MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE
+                DeprecatedIndexPredicate.MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE
             );
         }
 

+ 2 - 2
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java

@@ -26,8 +26,8 @@ import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDat
 import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask;
 import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams;
 
+import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate;
 import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.TASK_ID_PREFIX;
-import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;
 
 /*
  * This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation
@@ -68,7 +68,7 @@ public class ReindexDataStreamTransportAction extends HandledTransportAction<Rei
             return;
         }
         int totalIndices = dataStream.getIndices().size();
-        int totalIndicesToBeUpgraded = (int) dataStream.getIndices().stream().filter(getOldIndexVersionPredicate(metadata)).count();
+        int totalIndicesToBeUpgraded = (int) dataStream.getIndices().stream().filter(getReindexRequiredPredicate(metadata)).count();
         ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams(
             sourceDataStreamName,
             transportService.getThreadPool().absoluteTimeInMillis(),

+ 2 - 2
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

@@ -24,7 +24,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import java.util.List;
 import java.util.Map;
 
-import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;
+import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate;
 
 public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor<ReindexDataStreamTaskParams> {
     private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1);
@@ -74,7 +74,7 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec
             if (dataStreamInfos.size() == 1) {
                 List<Index> indices = dataStreamInfos.get(0).getDataStream().getIndices();
                 List<Index> indicesToBeReindexed = indices.stream()
-                    .filter(getOldIndexVersionPredicate(clusterService.state().metadata()))
+                    .filter(getReindexRequiredPredicate(clusterService.state().metadata()))
                     .toList();
                 reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
                 for (Index index : indicesToBeReindexed) {