Browse Source

Rest high level ReindexIT fix (#60834)

ReindexIT would rethrottle any delete or update by query task, fixed to
more precisely match the task started by the test.

Closes #60811
Henning Andersen 5 years ago
parent
commit
00fc0d2066

+ 10 - 6
client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java

@@ -50,6 +50,7 @@ import org.elasticsearch.tasks.TaskId;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -283,7 +284,7 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
                 }
             });
 
-            TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME);
+            TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME, updateByQueryRequest.getDescription());
             float requestsPerSecond = 1000f;
             ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
                 highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
@@ -414,7 +415,7 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
                 }
             });
 
-            TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME);
+            TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME, deleteByQueryRequest.getDescription());
             float requestsPerSecond = 1000f;
             ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
                 highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
@@ -477,7 +478,7 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
         }
     }
 
-    private static TaskId findTaskToRethrottle(String actionName) throws IOException {
+    private static TaskId findTaskToRethrottle(String actionName, String description) throws IOException {
         long start = System.nanoTime();
         ListTasksRequest request = new ListTasksRequest();
         request.setActions(actionName);
@@ -485,13 +486,16 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
         do {
             ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
             list.rethrowFailures("Finding tasks to rethrottle");
+            List<TaskGroup> taskGroups =
+                list.getTaskGroups().stream()
+                    .filter(taskGroup -> taskGroup.getTaskInfo().getDescription().equals(description)).collect(Collectors.toList());
             assertThat("tasks are left over from the last execution of this test",
-                list.getTaskGroups(), hasSize(lessThan(2)));
-            if (0 == list.getTaskGroups().size()) {
+                taskGroups, hasSize(lessThan(2)));
+            if (0 == taskGroups.size()) {
                 // The parent task hasn't started yet
                 continue;
             }
-            TaskGroup taskGroup = list.getTaskGroups().get(0);
+            TaskGroup taskGroup = taskGroups.get(0);
             assertThat(taskGroup.getChildTasks(), empty());
             return taskGroup.getTaskInfo().getTaskId();
         } while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));