1
0
Эх сурвалжийг харах

Merge branch '8.x' of github.com:elastic/elasticsearch into 8.x

Keith Massey 10 сар өмнө
parent
commit
0a53248d42

+ 5 - 4
modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java

@@ -53,7 +53,6 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec
             params.startTime(),
             params.totalIndices(),
             params.totalIndicesToBeUpgraded(),
-            threadPool,
             id,
             type,
             action,
@@ -76,9 +75,11 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec
                 List<Index> indicesToBeReindexed = indices.stream()
                     .filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion())
                     .toList();
-                reindexDataStreamTask.setPendingIndices(indicesToBeReindexed.stream().map(Index::getName).toList());
+                reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
                 for (Index index : indicesToBeReindexed) {
+                    reindexDataStreamTask.incrementInProgressIndicesCount();
                     // TODO This is just a placeholder. This is where the real data stream reindex logic will go
+                    reindexDataStreamTask.reindexSucceeded();
                 }
 
                 completeSuccessfulPersistentTask(reindexDataStreamTask);
@@ -89,12 +90,12 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec
     }
 
     private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {
-        persistentTask.reindexSucceeded();
+        persistentTask.allReindexesCompleted();
         threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic());
     }
 
     private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) {
-        persistentTask.reindexFailed(e);
+        persistentTask.taskFailed(e);
         threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic());
     }
 

+ 19 - 16
modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java

@@ -12,29 +12,27 @@ package org.elasticsearch.datastreams.task;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.persistent.AllocatedPersistentTask;
 import org.elasticsearch.tasks.TaskId;
-import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class ReindexDataStreamTask extends AllocatedPersistentTask {
     public static final String TASK_NAME = "reindex-data-stream";
     private final long persistentTaskStartTime;
     private final int totalIndices;
     private final int totalIndicesToBeUpgraded;
-    private final ThreadPool threadPool;
     private boolean complete = false;
     private Exception exception;
-    private List<String> inProgress = new ArrayList<>();
-    private List<String> pending = List.of();
+    private AtomicInteger inProgress = new AtomicInteger(0);
+    private AtomicInteger pending = new AtomicInteger();
     private List<Tuple<String, Exception>> errors = new ArrayList<>();
 
     public ReindexDataStreamTask(
         long persistentTaskStartTime,
         int totalIndices,
         int totalIndicesToBeUpgraded,
-        ThreadPool threadPool,
         long id,
         String type,
         String action,
@@ -46,7 +44,6 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
         this.persistentTaskStartTime = persistentTaskStartTime;
         this.totalIndices = totalIndices;
         this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded;
-        this.threadPool = threadPool;
     }
 
     @Override
@@ -57,30 +54,36 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
             totalIndicesToBeUpgraded,
             complete,
             exception,
-            inProgress.size(),
-            pending.size(),
+            inProgress.get(),
+            pending.get(),
             errors
         );
     }
 
-    public void reindexSucceeded() {
+    public void allReindexesCompleted() {
         this.complete = true;
     }
 
-    public void reindexFailed(Exception e) {
+    public void taskFailed(Exception e) {
         this.complete = true;
         this.exception = e;
     }
 
-    public void setInProgressIndices(List<String> inProgressIndices) {
-        this.inProgress = inProgressIndices;
+    public void reindexSucceeded() {
+        inProgress.decrementAndGet();
+    }
+
+    public void reindexFailed(String index, Exception error) {
+        this.errors.add(Tuple.tuple(index, error));
+        inProgress.decrementAndGet();
     }
 
-    public void setPendingIndices(List<String> pendingIndices) {
-        this.pending = pendingIndices;
+    public void incrementInProgressIndicesCount() {
+        inProgress.incrementAndGet();
+        pending.decrementAndGet();
     }
 
-    public void addErrorIndex(String index, Exception error) {
-        this.errors.add(Tuple.tuple(index, error));
+    public void setPendingIndicesCount(int size) {
+        pending.set(size);
     }
 }