Parcourir la source

Correctly release threads from starting gate in o.e.c.ClusterServiceIT

Jason Tedor il y a 9 ans
Parent
commit
84c4ab6c18
1 fichiers modifiés avec 24 ajouts et 16 suppressions
  1. 24 16
      core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java

+ 24 - 16
core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java

@@ -831,30 +831,38 @@ public class ClusterServiceIT extends ESIntegTestCase {
             counts.merge(executor, 1, (previous, one) -> previous + one);
         }
 
-        CountDownLatch startingGun = new CountDownLatch(1 + numberOfThreads);
-        List<Thread> threads = new ArrayList<>();
+        CountDownLatch startGate = new CountDownLatch(1);
+        CountDownLatch endGate = new CountDownLatch(numberOfThreads);
+        AtomicBoolean interrupted = new AtomicBoolean();
         for (int i = 0; i < numberOfThreads; i++) {
             final int index = i;
             Thread thread = new Thread(() -> {
-                startingGun.countDown();
-                for (int j = 0; j < tasksSubmittedPerThread; j++) {
-                    ClusterStateTaskExecutor<Task> executor = assignments.get(index * tasksSubmittedPerThread + j);
-                    clusterService.submitStateUpdateTask(
-                            Thread.currentThread().getName(),
-                            new Task(),
-                            ClusterStateTaskConfig.build(randomFrom(Priority.values())),
-                            executor,
-                            listener);
+                try {
+                    try {
+                        startGate.await();
+                    } catch (InterruptedException e) {
+                        interrupted.set(true);
+                        return;
+                    }
+                    for (int j = 0; j < tasksSubmittedPerThread; j++) {
+                        ClusterStateTaskExecutor<Task> executor = assignments.get(index * tasksSubmittedPerThread + j);
+                        clusterService.submitStateUpdateTask(
+                                Thread.currentThread().getName(),
+                                new Task(),
+                                ClusterStateTaskConfig.build(randomFrom(Priority.values())),
+                                executor,
+                                listener);
+                    }
+                } finally {
+                    endGate.countDown();
                 }
             });
-            threads.add(thread);
             thread.start();
         }
 
-        startingGun.countDown();
-        for (Thread thread : threads) {
-            thread.join();
-        }
+        startGate.countDown();
+        endGate.await();
+        assertFalse(interrupted.get());
 
         // wait until all the cluster state updates have been processed
         updateLatch.await();