1
0
Эх сурвалжийг харах

Avoid restarting data stream reindex when cluster is upgraded (#125587)

Keith Massey 7 сар өмнө
parent
commit
6a74aba04d

+ 23 - 1
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

@@ -108,6 +108,11 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec
         ReindexDataStreamTaskParams params,
         PersistentTaskState persistentTaskState
     ) {
+        Long completionTime = getCompletionTime(persistentTaskState);
+        if (completionTime != null && task instanceof ReindexDataStreamTask reindexDataStreamTask) {
+            reindexDataStreamTask.allReindexesCompleted(threadPool, getTimeToLive(completionTime));
+            return;
+        }
         ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTaskState;
         String sourceDataStream = params.getSourceDataStream();
         TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
@@ -316,6 +321,14 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec
         persistentTask.taskFailed(threadPool, updateCompletionTimeAndGetTimeToLive(persistentTask, state), e);
     }
 
+    private Long getCompletionTime(PersistentTaskState persistentTaskState) {
+        if (persistentTaskState instanceof ReindexDataStreamPersistentTaskState state) {
+            return state.completionTime();
+        } else {
+            return null;
+        }
+    }
+
     private TimeValue updateCompletionTimeAndGetTimeToLive(
         ReindexDataStreamTask reindexDataStreamTask,
         @Nullable ReindexDataStreamPersistentTaskState state
@@ -345,6 +358,15 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec
                 completionTime = state.completionTime();
             }
         }
-        return TimeValue.timeValueMillis(TASK_KEEP_ALIVE_TIME.millis() - (threadPool.absoluteTimeInMillis() - completionTime));
+        return getTimeToLive(completionTime);
+    }
+
+    private TimeValue getTimeToLive(long completionTimeInMillis) {
+        return TimeValue.timeValueMillis(
+            TASK_KEEP_ALIVE_TIME.millis() - Math.min(
+                TASK_KEEP_ALIVE_TIME.millis(),
+                threadPool.absoluteTimeInMillis() - completionTimeInMillis
+            )
+        );
     }
 }

+ 4 - 0
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java

@@ -46,6 +46,10 @@ public record ReindexDataStreamPersistentTaskState(
         this(in.readOptionalInt(), in.readOptionalInt(), in.readOptionalLong());
     }
 
+    public boolean isComplete() {
+        return completionTime != null;
+    }
+
     @Override
     public String getWriteableName() {
         return NAME;

+ 35 - 7
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java

@@ -30,7 +30,7 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
     private final long persistentTaskStartTime;
     private final int initialTotalIndices;
     private final int initialTotalIndicesToBeUpgraded;
-    private volatile boolean complete = false;
+    private boolean isCompleteLocally = false;
     private volatile Exception exception;
     private final Set<String> inProgress = Collections.synchronizedSet(new HashSet<>());
     private final AtomicInteger pending = new AtomicInteger();
@@ -73,18 +73,26 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
             clusterService.state(),
             getPersistentTaskId()
         );
+        boolean isComplete;
         if (persistentTask != null) {
             ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState();
-            if (state != null && state.totalIndices() != null && state.totalIndicesToBeUpgraded() != null) {
-                totalIndices = Math.toIntExact(state.totalIndices());
-                totalIndicesToBeUpgraded = Math.toIntExact(state.totalIndicesToBeUpgraded());
+            if (state != null) {
+                isComplete = state.isComplete();
+                if (state.totalIndices() != null && state.totalIndicesToBeUpgraded() != null) {
+                    totalIndices = Math.toIntExact(state.totalIndices());
+                    totalIndicesToBeUpgraded = Math.toIntExact(state.totalIndicesToBeUpgraded());
+                }
+            } else {
+                isComplete = false;
             }
+        } else {
+            isComplete = false;
         }
         return new ReindexDataStreamStatus(
             persistentTaskStartTime,
             totalIndices,
             totalIndicesToBeUpgraded,
-            complete,
+            isComplete,
             exception,
             inProgress,
             pending.get(),
@@ -93,7 +101,7 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
     }
 
     public void allReindexesCompleted(ThreadPool threadPool, TimeValue timeToLive) {
-        this.complete = true;
+        isCompleteLocally = true;
         if (isCancelled()) {
             completeTask.run();
         } else {
@@ -120,6 +128,24 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
         pending.decrementAndGet();
     }
 
+    private boolean isCompleteInClusterState() {
+        PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
+            .getMetadata()
+            .getProject()
+            .custom(PersistentTasksCustomMetadata.TYPE);
+        PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(getPersistentTaskId());
+        if (persistentTask != null) {
+            ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState();
+            if (state != null) {
+                return state.isComplete();
+            } else {
+                return false;
+            }
+        } else {
+            return false;
+        }
+    }
+
     public void setPendingIndicesCount(int size) {
         pending.set(size);
     }
@@ -130,8 +156,10 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
          * If the task is complete, but just waiting for its scheduled removal, we go ahead and call markAsCompleted/markAsFailed
          * immediately. This results in the running task being removed from the task manager. If the task is not complete, then one of
          * allReindexesCompleted or taskFailed will be called in the future, resulting in the same thing.
+         * We check both the cluster state and isCompleteLocally -- it is possible (especially in tests) that hte cluster state
+         * update has not happened in between when allReindexesCompleted was called and when this is called.
          */
-        if (complete) {
+        if (isCompleteInClusterState() || isCompleteLocally) {
             completeTask.run();
         }
     }

+ 68 - 5
x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Instant;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -208,7 +209,9 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
         } else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
             Map<String, Map<String, Object>> oldIndicesMetadata = getIndicesMetadata(dataStreamName);
             upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled);
+            cancelReindexTask(dataStreamName);
             upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 1, 0, ilmEnabled);
+            cancelReindexTask(dataStreamFromNonDataStreamIndices);
             Map<String, Map<String, Object>> upgradedIndicesMetadata = getIndicesMetadata(dataStreamName);
 
             if (ilmEnabled) {
@@ -219,6 +222,38 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
         }
     }
 
+    public void testMigrateDoesNotRestartOnUpgrade() throws Exception {
+        /*
+         * This test makes sure that if reindex is run and completed, then when the cluster is upgraded the task
+         * does not begin running again.
+         */
+        String dataStreamName = "reindex_test_data_stream_ugprade_test";
+        int numRollovers = randomIntBetween(0, 5);
+        boolean hasILMPolicy = randomBoolean();
+        boolean ilmEnabled = hasILMPolicy && randomBoolean();
+        if (CLUSTER_TYPE == ClusterType.OLD) {
+            createAndRolloverDataStream(dataStreamName, numRollovers, hasILMPolicy, ilmEnabled);
+            upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled);
+        } else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
+            makeSureNoUpgrade(dataStreamName);
+            cancelReindexTask(dataStreamName);
+        } else {
+            makeSureNoUpgrade(dataStreamName);
+        }
+    }
+
+    private void cancelReindexTask(String dataStreamName) throws IOException {
+        Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
+        String upgradeUser = "upgrade_user";
+        String upgradeUserPassword = "x-pack-test-password";
+        createRole("upgrade_role", dataStreamName);
+        createUser(upgradeUser, upgradeUserPassword, "upgrade_role");
+        try (RestClient upgradeUserClient = getClient(upgradeUser, upgradeUserPassword)) {
+            Response cancelResponse = upgradeUserClient.performRequest(cancelRequest);
+            assertOK(cancelResponse);
+        }
+    }
+
     private void compareIndexMetadata(
         Map<String, Map<String, Object>> oldIndicesMetadata,
         Map<String, Map<String, Object>> upgradedIndicesMetadata
@@ -422,7 +457,10 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
                 "data_stream": {
                 }
             }""";
-        var putIndexTemplateRequest = new Request("POST", "/_index_template/reindex_test_data_stream_template");
+        var putIndexTemplateRequest = new Request(
+            "POST",
+            "/_index_template/reindex_test_data_stream_template" + randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT)
+        );
         putIndexTemplateRequest.setJsonEntity(indexTemplate.replace("$TEMPLATE", template).replace("$PATTERN", dataStreamName));
         assertOK(client().performRequest(putIndexTemplateRequest));
         bulkLoadData(dataStreamName);
@@ -651,7 +689,7 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
                 assertOK(statusResponse);
                 assertThat(statusResponseString, statusResponseMap.get("complete"), equalTo(true));
                 final int originalWriteIndex = 1;
-                if (isOriginalClusterSameMajorVersionAsCurrent()) {
+                if (isOriginalClusterSameMajorVersionAsCurrent() || CLUSTER_TYPE == ClusterType.OLD) {
                     assertThat(
                         statusResponseString,
                         statusResponseMap.get("total_indices_in_data_stream"),
@@ -698,10 +736,35 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
             // Verify it's possible to reindex again after a successful reindex
             reindexResponse = upgradeUserClient.performRequest(reindexRequest);
             assertOK(reindexResponse);
+        }
+    }
 
-            Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
-            Response cancelResponse = upgradeUserClient.performRequest(cancelRequest);
-            assertOK(cancelResponse);
+    private void makeSureNoUpgrade(String dataStreamName) throws Exception {
+        String upgradeUser = "upgrade_user";
+        String upgradeUserPassword = "x-pack-test-password";
+        createRole("upgrade_role", dataStreamName);
+        createUser(upgradeUser, upgradeUserPassword, "upgrade_role");
+        try (RestClient upgradeUserClient = getClient(upgradeUser, upgradeUserPassword)) {
+            assertBusy(() -> {
+                try {
+                    Request statusRequest = new Request("GET", "_migration/reindex/" + dataStreamName + "/_status");
+                    Response statusResponse = upgradeUserClient.performRequest(statusRequest);
+                    Map<String, Object> statusResponseMap = XContentHelper.convertToMap(
+                        JsonXContent.jsonXContent,
+                        statusResponse.getEntity().getContent(),
+                        false
+                    );
+                    String statusResponseString = statusResponseMap.keySet()
+                        .stream()
+                        .map(key -> key + "=" + statusResponseMap.get(key))
+                        .collect(Collectors.joining(", ", "{", "}"));
+                    assertOK(statusResponse);
+                    assertThat(statusResponseString, statusResponseMap.get("complete"), equalTo(true));
+                    assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(0));
+                } catch (Exception e) {
+                    fail(e);
+                }
+            }, 60, TimeUnit.SECONDS);
         }
     }