瀏覽代碼

Use new runInParallel and startInParallel test util in more spots (#110853)

It's in the title. Use the utility in a couple more spots to save some code and thread creation.
Armin Braun 1 年之前
父節點
當前提交
2dc28ac2f1

+ 60 - 76
server/src/internalClusterTest/java/org/elasticsearch/versioning/SimpleVersioningIT.java

@@ -29,7 +29,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@@ -585,85 +584,70 @@ public class SimpleVersioningIT extends ESIntegTestCase {
         }
 
         final AtomicInteger upto = new AtomicInteger();
-        final CountDownLatch startingGun = new CountDownLatch(1);
-        Thread[] threads = new Thread[TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 20 : 5)];
         final long startTime = System.nanoTime();
-        for (int i = 0; i < threads.length; i++) {
-            final int threadID = i;
-            threads[i] = new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        // final Random threadRandom = RandomizedContext.current().getRandom();
-                        final Random threadRandom = random();
-                        startingGun.await();
-                        while (true) {
-
-                            // TODO: sometimes use bulk:
-
-                            int index = upto.getAndIncrement();
-                            if (index >= idVersions.length) {
-                                break;
-                            }
-                            if (index % 100 == 0) {
-                                logger.trace("{}: index={}", Thread.currentThread().getName(), index);
-                            }
-                            IDAndVersion idVersion = idVersions[index];
-
-                            String id = idVersion.id;
-                            idVersion.threadID = threadID;
-                            idVersion.indexStartTime = System.nanoTime() - startTime;
-                            long version = idVersion.version;
-                            if (idVersion.delete) {
-                                try {
-                                    idVersion.response = client().prepareDelete("test", id)
-                                        .setVersion(version)
-                                        .setVersionType(VersionType.EXTERNAL)
-                                        .get();
-                                } catch (VersionConflictEngineException vcee) {
-                                    // OK: our version is too old
-                                    assertThat(version, lessThanOrEqualTo(truth.get(id).version));
-                                    idVersion.versionConflict = true;
-                                }
-                            } else {
-                                try {
-                                    idVersion.response = prepareIndex("test").setId(id)
-                                        .setSource("foo", "bar")
-                                        .setVersion(version)
-                                        .setVersionType(VersionType.EXTERNAL)
-                                        .get();
-
-                                } catch (VersionConflictEngineException vcee) {
-                                    // OK: our version is too old
-                                    assertThat(version, lessThanOrEqualTo(truth.get(id).version));
-                                    idVersion.versionConflict = true;
-                                }
-                            }
-                            idVersion.indexFinishTime = System.nanoTime() - startTime;
-
-                            if (threadRandom.nextInt(100) == 7) {
-                                logger.trace("--> {}: TEST: now refresh at {}", threadID, System.nanoTime() - startTime);
-                                refresh();
-                                logger.trace("--> {}: TEST: refresh done at {}", threadID, System.nanoTime() - startTime);
-                            }
-                            if (threadRandom.nextInt(100) == 7) {
-                                logger.trace("--> {}: TEST: now flush at {}", threadID, System.nanoTime() - startTime);
-                                flush();
-                                logger.trace("--> {}: TEST: flush done at {}", threadID, System.nanoTime() - startTime);
-                            }
+        startInParallel(TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 20 : 5), threadID -> {
+            try {
+                // final Random threadRandom = RandomizedContext.current().getRandom();
+                final Random threadRandom = random();
+                while (true) {
+
+                    // TODO: sometimes use bulk:
+
+                    int index = upto.getAndIncrement();
+                    if (index >= idVersions.length) {
+                        break;
+                    }
+                    if (index % 100 == 0) {
+                        logger.trace("{}: index={}", Thread.currentThread().getName(), index);
+                    }
+                    IDAndVersion idVersion = idVersions[index];
+
+                    String id = idVersion.id;
+                    idVersion.threadID = threadID;
+                    idVersion.indexStartTime = System.nanoTime() - startTime;
+                    long v = idVersion.version;
+                    if (idVersion.delete) {
+                        try {
+                            idVersion.response = client().prepareDelete("test", id)
+                                .setVersion(v)
+                                .setVersionType(VersionType.EXTERNAL)
+                                .get();
+                        } catch (VersionConflictEngineException vcee) {
+                            // OK: our version is too old
+                            assertThat(v, lessThanOrEqualTo(truth.get(id).version));
+                            idVersion.versionConflict = true;
+                        }
+                    } else {
+                        try {
+                            idVersion.response = prepareIndex("test").setId(id)
+                                .setSource("foo", "bar")
+                                .setVersion(v)
+                                .setVersionType(VersionType.EXTERNAL)
+                                .get();
+
+                        } catch (VersionConflictEngineException vcee) {
+                            // OK: our version is too old
+                            assertThat(v, lessThanOrEqualTo(truth.get(id).version));
+                            idVersion.versionConflict = true;
                         }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
                     }
-                }
-            };
-            threads[i].start();
-        }
+                    idVersion.indexFinishTime = System.nanoTime() - startTime;
 
-        startingGun.countDown();
-        for (Thread thread : threads) {
-            thread.join();
-        }
+                    if (threadRandom.nextInt(100) == 7) {
+                        logger.trace("--> {}: TEST: now refresh at {}", threadID, System.nanoTime() - startTime);
+                        refresh();
+                        logger.trace("--> {}: TEST: refresh done at {}", threadID, System.nanoTime() - startTime);
+                    }
+                    if (threadRandom.nextInt(100) == 7) {
+                        logger.trace("--> {}: TEST: now flush at {}", threadID, System.nanoTime() - startTime);
+                        flush();
+                        logger.trace("--> {}: TEST: flush done at {}", threadID, System.nanoTime() - startTime);
+                    }
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
 
         // Verify against truth:
         boolean failed = false;

+ 76 - 101
server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java

@@ -748,43 +748,34 @@ public class SearchPhaseControllerTests extends ESTestCase {
             )
         ) {
             AtomicInteger max = new AtomicInteger();
-            Thread[] threads = new Thread[expectedNumResults];
             CountDownLatch latch = new CountDownLatch(expectedNumResults);
-            for (int i = 0; i < expectedNumResults; i++) {
-                int id = i;
-                threads[i] = new Thread(() -> {
-                    int number = randomIntBetween(1, 1000);
-                    max.updateAndGet(prev -> Math.max(prev, number));
-                    QuerySearchResult result = new QuerySearchResult(
-                        new ShardSearchContextId("", id),
-                        new SearchShardTarget("node", new ShardId("a", "b", id), null),
-                        null
+            runInParallel(expectedNumResults, id -> {
+                int number = randomIntBetween(1, 1000);
+                max.updateAndGet(prev -> Math.max(prev, number));
+                QuerySearchResult result = new QuerySearchResult(
+                    new ShardSearchContextId("", id),
+                    new SearchShardTarget("node", new ShardId("a", "b", id), null),
+                    null
+                );
+                try {
+                    result.topDocs(
+                        new TopDocsAndMaxScore(
+                            new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }),
+                            number
+                        ),
+                        new DocValueFormat[0]
                     );
-                    try {
-                        result.topDocs(
-                            new TopDocsAndMaxScore(
-                                new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }),
-                                number
-                            ),
-                            new DocValueFormat[0]
-                        );
-                        InternalAggregations aggs = InternalAggregations.from(
-                            Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap()))
-                        );
-                        result.aggregations(aggs);
-                        result.setShardIndex(id);
-                        result.size(1);
-                        consumer.consumeResult(result, latch::countDown);
-                    } finally {
-                        result.decRef();
-                    }
-
-                });
-                threads[i].start();
-            }
-            for (int i = 0; i < expectedNumResults; i++) {
-                threads[i].join();
-            }
+                    InternalAggregations aggs = InternalAggregations.from(
+                        Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap()))
+                    );
+                    result.aggregations(aggs);
+                    result.setShardIndex(id);
+                    result.size(1);
+                    consumer.consumeResult(result, latch::countDown);
+                } finally {
+                    result.decRef();
+                }
+            });
             latch.await();
 
             SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
@@ -1264,42 +1255,34 @@ public class SearchPhaseControllerTests extends ESTestCase {
                 )
             ) {
                 AtomicInteger max = new AtomicInteger();
-                Thread[] threads = new Thread[expectedNumResults];
                 CountDownLatch latch = new CountDownLatch(expectedNumResults);
-                for (int i = 0; i < expectedNumResults; i++) {
-                    int id = i;
-                    threads[i] = new Thread(() -> {
-                        int number = randomIntBetween(1, 1000);
-                        max.updateAndGet(prev -> Math.max(prev, number));
-                        QuerySearchResult result = new QuerySearchResult(
-                            new ShardSearchContextId("", id),
-                            new SearchShardTarget("node", new ShardId("a", "b", id), null),
-                            null
+                runInParallel(expectedNumResults, id -> {
+                    int number = randomIntBetween(1, 1000);
+                    max.updateAndGet(prev -> Math.max(prev, number));
+                    QuerySearchResult result = new QuerySearchResult(
+                        new ShardSearchContextId("", id),
+                        new SearchShardTarget("node", new ShardId("a", "b", id), null),
+                        null
+                    );
+                    try {
+                        result.topDocs(
+                            new TopDocsAndMaxScore(
+                                new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }),
+                                number
+                            ),
+                            new DocValueFormat[0]
                         );
-                        try {
-                            result.topDocs(
-                                new TopDocsAndMaxScore(
-                                    new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }),
-                                    number
-                                ),
-                                new DocValueFormat[0]
-                            );
-                            InternalAggregations aggs = InternalAggregations.from(
-                                Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap()))
-                            );
-                            result.aggregations(aggs);
-                            result.setShardIndex(id);
-                            result.size(1);
-                            consumer.consumeResult(result, latch::countDown);
-                        } finally {
-                            result.decRef();
-                        }
-                    });
-                    threads[i].start();
-                }
-                for (int i = 0; i < expectedNumResults; i++) {
-                    threads[i].join();
-                }
+                        InternalAggregations aggs = InternalAggregations.from(
+                            Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap()))
+                        );
+                        result.aggregations(aggs);
+                        result.setShardIndex(id);
+                        result.size(1);
+                        consumer.consumeResult(result, latch::countDown);
+                    } finally {
+                        result.decRef();
+                    }
+                });
                 latch.await();
                 SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
                 assertAggReduction(request);
@@ -1354,39 +1337,31 @@ public class SearchPhaseControllerTests extends ESTestCase {
             )
         ) {
             CountDownLatch latch = new CountDownLatch(numShards);
-            Thread[] threads = new Thread[numShards];
-            for (int i = 0; i < numShards; i++) {
-                final int index = i;
-                threads[index] = new Thread(() -> {
-                    QuerySearchResult result = new QuerySearchResult(
-                        new ShardSearchContextId(UUIDs.randomBase64UUID(), index),
-                        new SearchShardTarget("node", new ShardId("a", "b", index), null),
-                        null
+            runInParallel(numShards, index -> {
+                QuerySearchResult result = new QuerySearchResult(
+                    new ShardSearchContextId(UUIDs.randomBase64UUID(), index),
+                    new SearchShardTarget("node", new ShardId("a", "b", index), null),
+                    null
+                );
+                try {
+                    result.topDocs(
+                        new TopDocsAndMaxScore(
+                            new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS),
+                            Float.NaN
+                        ),
+                        new DocValueFormat[0]
                     );
-                    try {
-                        result.topDocs(
-                            new TopDocsAndMaxScore(
-                                new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS),
-                                Float.NaN
-                            ),
-                            new DocValueFormat[0]
-                        );
-                        InternalAggregations aggs = InternalAggregations.from(
-                            Collections.singletonList(new Max("test", 0d, DocValueFormat.RAW, Collections.emptyMap()))
-                        );
-                        result.aggregations(aggs);
-                        result.setShardIndex(index);
-                        result.size(1);
-                        consumer.consumeResult(result, latch::countDown);
-                    } finally {
-                        result.decRef();
-                    }
-                });
-                threads[index].start();
-            }
-            for (int i = 0; i < numShards; i++) {
-                threads[i].join();
-            }
+                    InternalAggregations aggs = InternalAggregations.from(
+                        Collections.singletonList(new Max("test", 0d, DocValueFormat.RAW, Collections.emptyMap()))
+                    );
+                    result.aggregations(aggs);
+                    result.setShardIndex(index);
+                    result.size(1);
+                    consumer.consumeResult(result, latch::countDown);
+                } finally {
+                    result.decRef();
+                }
+            });
             latch.await();
             if (shouldFail) {
                 if (shouldFailPartial == false) {

+ 27 - 34
server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java

@@ -430,9 +430,9 @@ public class ShardStateActionTests extends ESTestCase {
         for (int i = 0; i < failedShards.length; i++) {
             failedShards[i] = getRandomShardRouting(index);
         }
-        Thread[] clientThreads = new Thread[between(1, 6)];
+        final int clientThreads = between(1, 6);
         int iterationsPerThread = scaledRandomIntBetween(50, 500);
-        Phaser barrier = new Phaser(clientThreads.length + 2); // one for master thread, one for the main thread
+        Phaser barrier = new Phaser(clientThreads + 1); // +1 for the master thread
         Thread masterThread = new Thread(() -> {
             barrier.arriveAndAwaitAdvance();
             while (shutdown.get() == false) {
@@ -448,39 +448,32 @@ public class ShardStateActionTests extends ESTestCase {
         masterThread.start();
 
         AtomicInteger notifiedResponses = new AtomicInteger();
-        for (int t = 0; t < clientThreads.length; t++) {
-            clientThreads[t] = new Thread(() -> {
-                barrier.arriveAndAwaitAdvance();
-                for (int i = 0; i < iterationsPerThread; i++) {
-                    ShardRouting failedShard = randomFrom(failedShards);
-                    shardStateAction.remoteShardFailed(
-                        failedShard.shardId(),
-                        failedShard.allocationId().getId(),
-                        randomLongBetween(1, Long.MAX_VALUE),
-                        randomBoolean(),
-                        "test",
-                        getSimulatedFailure(),
-                        new ActionListener<Void>() {
-                            @Override
-                            public void onResponse(Void aVoid) {
-                                notifiedResponses.incrementAndGet();
-                            }
-
-                            @Override
-                            public void onFailure(Exception e) {
-                                notifiedResponses.incrementAndGet();
-                            }
+        runInParallel(clientThreads, t -> {
+            barrier.arriveAndAwaitAdvance();
+            for (int i = 0; i < iterationsPerThread; i++) {
+                ShardRouting failedShard = randomFrom(failedShards);
+                shardStateAction.remoteShardFailed(
+                    failedShard.shardId(),
+                    failedShard.allocationId().getId(),
+                    randomLongBetween(1, Long.MAX_VALUE),
+                    randomBoolean(),
+                    "test",
+                    getSimulatedFailure(),
+                    new ActionListener<>() {
+                        @Override
+                        public void onResponse(Void aVoid) {
+                            notifiedResponses.incrementAndGet();
                         }
-                    );
-                }
-            });
-            clientThreads[t].start();
-        }
-        barrier.arriveAndAwaitAdvance();
-        for (Thread t : clientThreads) {
-            t.join();
-        }
-        assertBusy(() -> assertThat(notifiedResponses.get(), equalTo(clientThreads.length * iterationsPerThread)));
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            notifiedResponses.incrementAndGet();
+                        }
+                    }
+                );
+            }
+        });
+        assertBusy(() -> assertThat(notifiedResponses.get(), equalTo(clientThreads * iterationsPerThread)));
         shutdown.set(true);
         masterThread.join();
     }

+ 7 - 21
server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ContinuousComputationTests.java

@@ -17,7 +17,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
 import java.util.Arrays;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -60,26 +59,13 @@ public class ContinuousComputationTests extends ESTestCase {
             }
         };
 
-        final Thread[] threads = new Thread[between(1, 5)];
-        final int[] valuePerThread = new int[threads.length];
-        final CountDownLatch startLatch = new CountDownLatch(1);
-        for (int i = 0; i < threads.length; i++) {
-            final int threadIndex = i;
-            valuePerThread[threadIndex] = randomInt();
-            threads[threadIndex] = new Thread(() -> {
-                safeAwait(startLatch);
-                for (int j = 1000; j >= 0; j--) {
-                    computation.onNewInput(valuePerThread[threadIndex] = valuePerThread[threadIndex] + j);
-                }
-            }, "submit-thread-" + threadIndex);
-            threads[threadIndex].start();
-        }
-
-        startLatch.countDown();
-
-        for (Thread thread : threads) {
-            thread.join();
-        }
+        final int threads = between(1, 5);
+        final int[] valuePerThread = new int[threads];
+        startInParallel(threads, threadIndex -> {
+            for (int j = 1000; j >= 0; j--) {
+                computation.onNewInput(valuePerThread[threadIndex] = valuePerThread[threadIndex] + j);
+            }
+        });
 
         assertBusy(() -> assertFalse(computation.isActive()));
 

+ 72 - 141
server/src/test/java/org/elasticsearch/common/cache/CacheTests.java

@@ -326,32 +326,19 @@ public class CacheTests extends ESTestCase {
         assertEquals(numberOfEntries, cache.stats().getEvictions());
     }
 
-    public void testComputeIfAbsentDeadlock() {
-        final int numberOfThreads = randomIntBetween(2, 32);
+    public void testComputeIfAbsentDeadlock() throws InterruptedException {
         final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder()
             .setExpireAfterAccess(TimeValue.timeValueNanos(1))
             .build();
-
-        final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
-        for (int i = 0; i < numberOfThreads; i++) {
-            final Thread thread = new Thread(() -> {
-                safeAwait(barrier);
-                for (int j = 0; j < numberOfEntries; j++) {
-                    try {
-                        cache.computeIfAbsent(0, k -> Integer.toString(k));
-                    } catch (final ExecutionException e) {
-                        throw new AssertionError(e);
-                    }
+        startInParallel(randomIntBetween(2, 32), i -> {
+            for (int j = 0; j < numberOfEntries; j++) {
+                try {
+                    cache.computeIfAbsent(0, k -> Integer.toString(k));
+                } catch (final ExecutionException e) {
+                    throw new AssertionError(e);
                 }
-                safeAwait(barrier);
-            });
-            thread.start();
-        }
-
-        // wait for all threads to be ready
-        safeAwait(barrier);
-        // wait for all threads to finish
-        safeAwait(barrier);
+            }
+        });
     }
 
     // randomly promote some entries, step the clock forward, then check that the promoted entries remain and the
@@ -596,41 +583,26 @@ public class CacheTests extends ESTestCase {
         }
     }
 
-    public void testComputeIfAbsentCallsOnce() {
-        int numberOfThreads = randomIntBetween(2, 32);
+    public void testComputeIfAbsentCallsOnce() throws InterruptedException {
         final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
         AtomicReferenceArray<Object> flags = new AtomicReferenceArray<>(numberOfEntries);
         for (int j = 0; j < numberOfEntries; j++) {
             flags.set(j, false);
         }
-
         CopyOnWriteArrayList<ExecutionException> failures = new CopyOnWriteArrayList<>();
-
-        CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
-        for (int i = 0; i < numberOfThreads; i++) {
-            Thread thread = new Thread(() -> {
-                safeAwait(barrier);
-                for (int j = 0; j < numberOfEntries; j++) {
-                    try {
-                        cache.computeIfAbsent(j, key -> {
-                            assertTrue(flags.compareAndSet(key, false, true));
-                            return Integer.toString(key);
-                        });
-                    } catch (ExecutionException e) {
-                        failures.add(e);
-                        break;
-                    }
+        startInParallel(randomIntBetween(2, 32), i -> {
+            for (int j = 0; j < numberOfEntries; j++) {
+                try {
+                    cache.computeIfAbsent(j, key -> {
+                        assertTrue(flags.compareAndSet(key, false, true));
+                        return Integer.toString(key);
+                    });
+                } catch (ExecutionException e) {
+                    failures.add(e);
+                    break;
                 }
-                safeAwait(barrier);
-            });
-            thread.start();
-        }
-
-        // wait for all threads to be ready
-        safeAwait(barrier);
-        // wait for all threads to finish
-        safeAwait(barrier);
-
+            }
+        });
         assertThat(failures, is(empty()));
     }
 
@@ -751,111 +723,70 @@ public class CacheTests extends ESTestCase {
         assertFalse("deadlock", deadlock.get());
     }
 
-    public void testCachePollution() {
+    public void testCachePollution() throws InterruptedException {
         int numberOfThreads = randomIntBetween(2, 32);
         final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
-
-        CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
-
-        for (int i = 0; i < numberOfThreads; i++) {
-            Thread thread = new Thread(() -> {
-                safeAwait(barrier);
-                Random random = new Random(random().nextLong());
-                for (int j = 0; j < numberOfEntries; j++) {
-                    Integer key = random.nextInt(numberOfEntries);
-                    boolean first;
-                    boolean second;
-                    do {
-                        first = random.nextBoolean();
-                        second = random.nextBoolean();
-                    } while (first && second);
-                    if (first) {
-                        try {
-                            cache.computeIfAbsent(key, k -> {
-                                if (random.nextBoolean()) {
-                                    return Integer.toString(k);
-                                } else {
-                                    throw new Exception("testCachePollution");
-                                }
-                            });
-                        } catch (ExecutionException e) {
-                            assertNotNull(e.getCause());
-                            assertThat(e.getCause(), instanceOf(Exception.class));
-                            assertEquals(e.getCause().getMessage(), "testCachePollution");
-                        }
-                    } else if (second) {
-                        cache.invalidate(key);
-                    } else {
-                        cache.get(key);
+        startInParallel(numberOfThreads, i -> {
+            Random random = new Random(random().nextLong());
+            for (int j = 0; j < numberOfEntries; j++) {
+                Integer key = random.nextInt(numberOfEntries);
+                boolean first;
+                boolean second;
+                do {
+                    first = random.nextBoolean();
+                    second = random.nextBoolean();
+                } while (first && second);
+                if (first) {
+                    try {
+                        cache.computeIfAbsent(key, k -> {
+                            if (random.nextBoolean()) {
+                                return Integer.toString(k);
+                            } else {
+                                throw new Exception("testCachePollution");
+                            }
+                        });
+                    } catch (ExecutionException e) {
+                        assertNotNull(e.getCause());
+                        assertThat(e.getCause(), instanceOf(Exception.class));
+                        assertEquals(e.getCause().getMessage(), "testCachePollution");
                     }
+                } else if (second) {
+                    cache.invalidate(key);
+                } else {
+                    cache.get(key);
                 }
-                safeAwait(barrier);
-            });
-            thread.start();
-        }
-
-        // wait for all threads to be ready
-        safeAwait(barrier);
-        // wait for all threads to finish
-        safeAwait(barrier);
+            }
+        });
     }
 
-    public void testExceptionThrownDuringConcurrentComputeIfAbsent() {
-        int numberOfThreads = randomIntBetween(2, 32);
+    public void testExceptionThrownDuringConcurrentComputeIfAbsent() throws InterruptedException {
         final Cache<String, String> cache = CacheBuilder.<String, String>builder().build();
-
-        CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
-
         final String key = randomAlphaOfLengthBetween(2, 32);
-        for (int i = 0; i < numberOfThreads; i++) {
-            Thread thread = new Thread(() -> {
-                safeAwait(barrier);
-                for (int j = 0; j < numberOfEntries; j++) {
-                    try {
-                        String value = cache.computeIfAbsent(key, k -> { throw new RuntimeException("failed to load"); });
-                        fail("expected exception but got: " + value);
-                    } catch (ExecutionException e) {
-                        assertNotNull(e.getCause());
-                        assertThat(e.getCause(), instanceOf(RuntimeException.class));
-                        assertEquals(e.getCause().getMessage(), "failed to load");
-                    }
+        startInParallel(randomIntBetween(2, 32), i -> {
+            for (int j = 0; j < numberOfEntries; j++) {
+                try {
+                    String value = cache.computeIfAbsent(key, k -> { throw new RuntimeException("failed to load"); });
+                    fail("expected exception but got: " + value);
+                } catch (ExecutionException e) {
+                    assertNotNull(e.getCause());
+                    assertThat(e.getCause(), instanceOf(RuntimeException.class));
+                    assertEquals(e.getCause().getMessage(), "failed to load");
                 }
-                safeAwait(barrier);
-            });
-            thread.start();
-        }
-
-        // wait for all threads to be ready
-        safeAwait(barrier);
-        // wait for all threads to finish
-        safeAwait(barrier);
+            }
+        });
     }
 
     // test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key
     // here be dragons: this test did catch one subtle bug during development; do not remove lightly
-    public void testTorture() {
-        int numberOfThreads = randomIntBetween(2, 32);
+    public void testTorture() throws InterruptedException {
         final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().setMaximumWeight(1000).weigher((k, v) -> 2).build();
-
-        CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
-        for (int i = 0; i < numberOfThreads; i++) {
-            Thread thread = new Thread(() -> {
-                safeAwait(barrier);
-                Random random = new Random(random().nextLong());
-                for (int j = 0; j < numberOfEntries; j++) {
-                    Integer key = random.nextInt(numberOfEntries);
-                    cache.put(key, Integer.toString(j));
-                }
-                safeAwait(barrier);
-            });
-            thread.start();
-        }
-
-        // wait for all threads to be ready
-        safeAwait(barrier);
-        // wait for all threads to finish
-        safeAwait(barrier);
-
+        startInParallel(randomIntBetween(2, 32), i -> {
+            Random random = new Random(random().nextLong());
+            for (int j = 0; j < numberOfEntries; j++) {
+                Integer key = random.nextInt(numberOfEntries);
+                cache.put(key, Integer.toString(j));
+            }
+        });
         cache.refresh();
         assertEquals(500, cache.count());
     }

+ 85 - 167
server/src/test/java/org/elasticsearch/common/compress/DeflateCompressTests.java

@@ -23,7 +23,6 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Random;
-import java.util.concurrent.CountDownLatch;
 import java.util.zip.ZipException;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -45,34 +44,17 @@ public class DeflateCompressTests extends ESTestCase {
     }
 
     public void testRandomThreads() throws Exception {
-        final Random r = random();
-        int threadCount = TestUtil.nextInt(r, 2, 6);
-        Thread[] threads = new Thread[threadCount];
-        final CountDownLatch startingGun = new CountDownLatch(1);
-        for (int tid = 0; tid < threadCount; tid++) {
-            final long seed = r.nextLong();
-            threads[tid] = new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        Random r = new Random(seed);
-                        startingGun.await();
-                        for (int i = 0; i < 10; i++) {
-                            byte bytes[] = new byte[TestUtil.nextInt(r, 1, 100000)];
-                            r.nextBytes(bytes);
-                            doTest(bytes);
-                        }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
+        startInParallel(randomIntBetween(2, 6), tid -> {
+            try {
+                for (int i = 0; i < 10; i++) {
+                    byte[] bytes = new byte[randomIntBetween(1, 100000)];
+                    randomBytesBetween(bytes, Byte.MIN_VALUE, Byte.MAX_VALUE);
+                    doTest(bytes);
                 }
-            };
-            threads[tid].start();
-        }
-        startingGun.countDown();
-        for (Thread t : threads) {
-            t.join();
-        }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
     }
 
     public void testLineDocs() throws IOException {
@@ -91,40 +73,24 @@ public class DeflateCompressTests extends ESTestCase {
     }
 
     public void testLineDocsThreads() throws Exception {
-        final Random r = random();
-        int threadCount = TestUtil.nextInt(r, 2, 6);
-        Thread[] threads = new Thread[threadCount];
-        final CountDownLatch startingGun = new CountDownLatch(1);
-        for (int tid = 0; tid < threadCount; tid++) {
-            final long seed = r.nextLong();
-            threads[tid] = new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        Random r = new Random(seed);
-                        startingGun.await();
-                        LineFileDocs lineFileDocs = new LineFileDocs(r);
-                        for (int i = 0; i < 10; i++) {
-                            int numDocs = TestUtil.nextInt(r, 1, 200);
-                            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                            for (int j = 0; j < numDocs; j++) {
-                                String s = lineFileDocs.nextDoc().get("body");
-                                bos.write(s.getBytes(StandardCharsets.UTF_8));
-                            }
-                            doTest(bos.toByteArray());
-                        }
-                        lineFileDocs.close();
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
+        int threadCount = randomIntBetween(2, 6);
+        startInParallel(threadCount, tid -> {
+            try {
+                LineFileDocs lineFileDocs = new LineFileDocs(random());
+                for (int i = 0; i < 10; i++) {
+                    int numDocs = randomIntBetween(1, 200);
+                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                    for (int j = 0; j < numDocs; j++) {
+                        String s = lineFileDocs.nextDoc().get("body");
+                        bos.write(s.getBytes(StandardCharsets.UTF_8));
                     }
+                    doTest(bos.toByteArray());
                 }
-            };
-            threads[tid].start();
-        }
-        startingGun.countDown();
-        for (Thread t : threads) {
-            t.join();
-        }
+                lineFileDocs.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
     }
 
     public void testRepetitionsL() throws IOException {
@@ -151,48 +117,32 @@ public class DeflateCompressTests extends ESTestCase {
     }
 
     public void testRepetitionsLThreads() throws Exception {
-        final Random r = random();
-        int threadCount = TestUtil.nextInt(r, 2, 6);
-        Thread[] threads = new Thread[threadCount];
-        final CountDownLatch startingGun = new CountDownLatch(1);
-        for (int tid = 0; tid < threadCount; tid++) {
-            final long seed = r.nextLong();
-            threads[tid] = new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        Random r = new Random(seed);
-                        startingGun.await();
-                        for (int i = 0; i < 10; i++) {
-                            int numLongs = TestUtil.nextInt(r, 1, 10000);
-                            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                            long theValue = r.nextLong();
-                            for (int j = 0; j < numLongs; j++) {
-                                if (r.nextInt(10) == 0) {
-                                    theValue = r.nextLong();
-                                }
-                                bos.write((byte) (theValue >>> 56));
-                                bos.write((byte) (theValue >>> 48));
-                                bos.write((byte) (theValue >>> 40));
-                                bos.write((byte) (theValue >>> 32));
-                                bos.write((byte) (theValue >>> 24));
-                                bos.write((byte) (theValue >>> 16));
-                                bos.write((byte) (theValue >>> 8));
-                                bos.write((byte) theValue);
-                            }
-                            doTest(bos.toByteArray());
+        int threadCount = randomIntBetween(2, 6);
+        startInParallel(threadCount, tid -> {
+            try {
+                for (int i = 0; i < 10; i++) {
+                    int numLongs = randomIntBetween(1, 10000);
+                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                    long theValue = randomLong();
+                    for (int j = 0; j < numLongs; j++) {
+                        if (randomInt(10) == 0) {
+                            theValue = randomLong();
                         }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
+                        bos.write((byte) (theValue >>> 56));
+                        bos.write((byte) (theValue >>> 48));
+                        bos.write((byte) (theValue >>> 40));
+                        bos.write((byte) (theValue >>> 32));
+                        bos.write((byte) (theValue >>> 24));
+                        bos.write((byte) (theValue >>> 16));
+                        bos.write((byte) (theValue >>> 8));
+                        bos.write((byte) theValue);
                     }
+                    doTest(bos.toByteArray());
                 }
-            };
-            threads[tid].start();
-        }
-        startingGun.countDown();
-        for (Thread t : threads) {
-            t.join();
-        }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
     }
 
     public void testRepetitionsI() throws IOException {
@@ -215,44 +165,28 @@ public class DeflateCompressTests extends ESTestCase {
     }
 
     public void testRepetitionsIThreads() throws Exception {
-        final Random r = random();
-        int threadCount = TestUtil.nextInt(r, 2, 6);
-        Thread[] threads = new Thread[threadCount];
-        final CountDownLatch startingGun = new CountDownLatch(1);
-        for (int tid = 0; tid < threadCount; tid++) {
-            final long seed = r.nextLong();
-            threads[tid] = new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        Random r = new Random(seed);
-                        startingGun.await();
-                        for (int i = 0; i < 10; i++) {
-                            int numInts = TestUtil.nextInt(r, 1, 20000);
-                            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                            int theValue = r.nextInt();
-                            for (int j = 0; j < numInts; j++) {
-                                if (r.nextInt(10) == 0) {
-                                    theValue = r.nextInt();
-                                }
-                                bos.write((byte) (theValue >>> 24));
-                                bos.write((byte) (theValue >>> 16));
-                                bos.write((byte) (theValue >>> 8));
-                                bos.write((byte) theValue);
-                            }
-                            doTest(bos.toByteArray());
+        int threadCount = randomIntBetween(2, 6);
+        startInParallel(threadCount, tid -> {
+            try {
+                for (int i = 0; i < 10; i++) {
+                    int numInts = randomIntBetween(1, 20000);
+                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                    int theValue = randomInt();
+                    for (int j = 0; j < numInts; j++) {
+                        if (randomInt(10) == 0) {
+                            theValue = randomInt();
                         }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
+                        bos.write((byte) (theValue >>> 24));
+                        bos.write((byte) (theValue >>> 16));
+                        bos.write((byte) (theValue >>> 8));
+                        bos.write((byte) theValue);
                     }
+                    doTest(bos.toByteArray());
                 }
-            };
-            threads[tid].start();
-        }
-        startingGun.countDown();
-        for (Thread t : threads) {
-            t.join();
-        }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
     }
 
     public void testRepetitionsS() throws IOException {
@@ -330,42 +264,26 @@ public class DeflateCompressTests extends ESTestCase {
     }
 
     public void testRepetitionsSThreads() throws Exception {
-        final Random r = random();
-        int threadCount = TestUtil.nextInt(r, 2, 6);
-        Thread[] threads = new Thread[threadCount];
-        final CountDownLatch startingGun = new CountDownLatch(1);
-        for (int tid = 0; tid < threadCount; tid++) {
-            final long seed = r.nextLong();
-            threads[tid] = new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        Random r = new Random(seed);
-                        startingGun.await();
-                        for (int i = 0; i < 10; i++) {
-                            int numShorts = TestUtil.nextInt(r, 1, 40000);
-                            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                            short theValue = (short) r.nextInt(65535);
-                            for (int j = 0; j < numShorts; j++) {
-                                if (r.nextInt(10) == 0) {
-                                    theValue = (short) r.nextInt(65535);
-                                }
-                                bos.write((byte) (theValue >>> 8));
-                                bos.write((byte) theValue);
-                            }
-                            doTest(bos.toByteArray());
+        int threadCount = randomIntBetween(2, 6);
+        startInParallel(threadCount, tid -> {
+            try {
+                for (int i = 0; i < 10; i++) {
+                    int numShorts = randomIntBetween(1, 40000);
+                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                    short theValue = (short) randomInt(65535);
+                    for (int j = 0; j < numShorts; j++) {
+                        if (randomInt(10) == 0) {
+                            theValue = (short) randomInt(65535);
                         }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
+                        bos.write((byte) (theValue >>> 8));
+                        bos.write((byte) theValue);
                     }
+                    doTest(bos.toByteArray());
                 }
-            };
-            threads[tid].start();
-        }
-        startingGun.countDown();
-        for (Thread t : threads) {
-            t.join();
-        }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
     }
 
     public void testCompressUncompressWithCorruptions() throws Exception {

+ 1 - 21
server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.java

@@ -11,7 +11,6 @@ package org.elasticsearch.common.util.concurrent;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ReachabilityChecker;
 
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class RunOnceTests extends ESTestCase {
@@ -33,26 +32,7 @@ public class RunOnceTests extends ESTestCase {
     public void testRunOnceConcurrently() throws InterruptedException {
         final AtomicInteger counter = new AtomicInteger(0);
         final RunOnce runOnce = new RunOnce(counter::incrementAndGet);
-
-        final Thread[] threads = new Thread[between(3, 10)];
-        final CountDownLatch latch = new CountDownLatch(1 + threads.length);
-        for (int i = 0; i < threads.length; i++) {
-            threads[i] = new Thread(() -> {
-                latch.countDown();
-                try {
-                    latch.await();
-                } catch (InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
-                runOnce.run();
-            });
-            threads[i].start();
-        }
-
-        latch.countDown();
-        for (Thread thread : threads) {
-            thread.join();
-        }
+        startInParallel(between(3, 10), i -> runOnce.run());
         assertTrue(runOnce.hasRun());
         assertEquals(1, counter.get());
     }

+ 14 - 37
server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java

@@ -357,46 +357,23 @@ public class NodeEnvironmentTests extends ESTestCase {
             flipFlop[i] = new AtomicInteger();
         }
 
-        Thread[] threads = new Thread[randomIntBetween(2, 5)];
-        final CountDownLatch latch = new CountDownLatch(1);
+        final int threads = randomIntBetween(2, 5);
         final int iters = scaledRandomIntBetween(10000, 100000);
-        for (int i = 0; i < threads.length; i++) {
-            threads[i] = new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        latch.await();
-                    } catch (InterruptedException e) {
-                        fail(e.getMessage());
-                    }
-                    for (int i = 0; i < iters; i++) {
-                        int shard = randomIntBetween(0, counts.length - 1);
-                        try {
-                            try (
-                                ShardLock autoCloses = env.shardLock(
-                                    new ShardId("foo", "fooUUID", shard),
-                                    "1",
-                                    scaledRandomIntBetween(0, 10)
-                                )
-                            ) {
-                                counts[shard].value++;
-                                countsAtomic[shard].incrementAndGet();
-                                assertEquals(flipFlop[shard].incrementAndGet(), 1);
-                                assertEquals(flipFlop[shard].decrementAndGet(), 0);
-                            }
-                        } catch (ShardLockObtainFailedException ex) {
-                            // ok
-                        }
+        startInParallel(threads, tid -> {
+            for (int i = 0; i < iters; i++) {
+                int shard = randomIntBetween(0, counts.length - 1);
+                try {
+                    try (ShardLock autoCloses = env.shardLock(new ShardId("foo", "fooUUID", shard), "1", scaledRandomIntBetween(0, 10))) {
+                        counts[shard].value++;
+                        countsAtomic[shard].incrementAndGet();
+                        assertEquals(flipFlop[shard].incrementAndGet(), 1);
+                        assertEquals(flipFlop[shard].decrementAndGet(), 0);
                     }
+                } catch (ShardLockObtainFailedException ex) {
+                    // ok
                 }
-            };
-            threads[i].start();
-        }
-        latch.countDown(); // fire the threads up
-        for (int i = 0; i < threads.length; i++) {
-            threads[i].join();
-        }
-
+            }
+        });
         assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
         for (int i = 0; i < counts.length; i++) {
             assertTrue(counts[i].value > 0);

+ 48 - 84
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -1252,24 +1252,15 @@ public class InternalEngineTests extends EngineTestCase {
                 assertThat(commitInfo.localCheckpoint(), equalTo(engine.getProcessedLocalCheckpoint()));
             }
         };
-        final Thread[] threads = new Thread[randomIntBetween(2, 4)];
-        final Phaser phaser = new Phaser(threads.length);
         globalCheckpoint.set(engine.getProcessedLocalCheckpoint());
-        for (int i = 0; i < threads.length; i++) {
-            threads[i] = new Thread(() -> {
-                phaser.arriveAndAwaitAdvance();
-                try {
-                    engine.syncTranslog();
-                    checker.run();
-                } catch (IOException e) {
-                    throw new AssertionError(e);
-                }
-            });
-            threads[i].start();
-        }
-        for (Thread thread : threads) {
-            thread.join();
-        }
+        startInParallel(randomIntBetween(2, 4), i -> {
+            try {
+                engine.syncTranslog();
+                checker.run();
+            } catch (IOException e) {
+                throw new AssertionError(e);
+            }
+        });
         checker.run();
     }
 
@@ -2419,8 +2410,6 @@ public class InternalEngineTests extends EngineTestCase {
         MapperService mapperService = createMapperService();
         MappingLookup mappingLookup = mapperService.mappingLookup();
         DocumentParser documentParser = mapperService.documentParser();
-        Thread[] thread = new Thread[randomIntBetween(3, 5)];
-        CountDownLatch startGun = new CountDownLatch(thread.length);
         final int opsPerThread = randomIntBetween(10, 20);
         class OpAndVersion {
             final long version;
@@ -2438,54 +2427,39 @@ public class InternalEngineTests extends EngineTestCase {
         ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null);
         final BytesRef uidTerm = newUid(doc);
         engine.index(indexForDoc(doc));
-        for (int i = 0; i < thread.length; i++) {
-            thread[i] = new Thread(() -> {
-                startGun.countDown();
-                safeAwait(startGun);
-                for (int op = 0; op < opsPerThread; op++) {
-                    Engine.Get engineGet = new Engine.Get(true, false, doc.id());
-                    try (Engine.GetResult get = engine.get(engineGet, mappingLookup, documentParser, randomSearcherWrapper())) {
-                        FieldsVisitor visitor = new FieldsVisitor(true);
-                        get.docIdAndVersion().reader.document(get.docIdAndVersion().docId, visitor);
-                        List<String> values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString()));
-                        String removed = op % 3 == 0 && values.size() > 0 ? values.remove(0) : null;
-                        String added = "v_" + idGenerator.incrementAndGet();
-                        values.add(added);
-                        Engine.Index index = new Engine.Index(
-                            uidTerm,
-                            testParsedDocument(
-                                "1",
-                                null,
-                                testDocument(),
-                                bytesArray(Strings.collectionToCommaDelimitedString(values)),
-                                null
-                            ),
-                            UNASSIGNED_SEQ_NO,
-                            2,
-                            get.version(),
-                            VersionType.INTERNAL,
-                            PRIMARY,
-                            System.currentTimeMillis(),
-                            -1,
-                            false,
-                            UNASSIGNED_SEQ_NO,
-                            0
-                        );
-                        Engine.IndexResult indexResult = engine.index(index);
-                        if (indexResult.getResultType() == Engine.Result.Type.SUCCESS) {
-                            history.add(new OpAndVersion(indexResult.getVersion(), removed, added));
-                        }
-
-                    } catch (IOException e) {
-                        throw new AssertionError(e);
+        startInParallel(randomIntBetween(3, 5), i -> {
+            for (int op = 0; op < opsPerThread; op++) {
+                Engine.Get engineGet = new Engine.Get(true, false, doc.id());
+                try (Engine.GetResult get = engine.get(engineGet, mappingLookup, documentParser, randomSearcherWrapper())) {
+                    FieldsVisitor visitor = new FieldsVisitor(true);
+                    get.docIdAndVersion().reader.document(get.docIdAndVersion().docId, visitor);
+                    List<String> values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString()));
+                    String removed = op % 3 == 0 && values.size() > 0 ? values.remove(0) : null;
+                    String added = "v_" + idGenerator.incrementAndGet();
+                    values.add(added);
+                    Engine.Index index = new Engine.Index(
+                        uidTerm,
+                        testParsedDocument("1", null, testDocument(), bytesArray(Strings.collectionToCommaDelimitedString(values)), null),
+                        UNASSIGNED_SEQ_NO,
+                        2,
+                        get.version(),
+                        VersionType.INTERNAL,
+                        PRIMARY,
+                        System.currentTimeMillis(),
+                        -1,
+                        false,
+                        UNASSIGNED_SEQ_NO,
+                        0
+                    );
+                    Engine.IndexResult indexResult = engine.index(index);
+                    if (indexResult.getResultType() == Engine.Result.Type.SUCCESS) {
+                        history.add(new OpAndVersion(indexResult.getVersion(), removed, added));
                     }
+                } catch (IOException e) {
+                    throw new AssertionError(e);
                 }
-            });
-            thread[i].start();
-        }
-        for (int i = 0; i < thread.length; i++) {
-            thread[i].join();
-        }
+            }
+        });
         List<OpAndVersion> sortedHistory = new ArrayList<>(history);
         sortedHistory.sort(Comparator.comparing(o -> o.version));
         Set<String> currentValues = new HashSet<>();
@@ -4363,7 +4337,6 @@ public class InternalEngineTests extends EngineTestCase {
     }
 
     public void testRetryConcurrently() throws InterruptedException, IOException {
-        Thread[] thread = new Thread[randomIntBetween(3, 5)];
         int numDocs = randomIntBetween(1000, 10000);
         List<Engine.Index> docs = new ArrayList<>();
         final boolean primary = randomBoolean();
@@ -4389,26 +4362,17 @@ public class InternalEngineTests extends EngineTestCase {
             docs.add(retryIndex);
         }
         Collections.shuffle(docs, random());
-        CountDownLatch startGun = new CountDownLatch(thread.length);
         AtomicInteger offset = new AtomicInteger(-1);
-        for (int i = 0; i < thread.length; i++) {
-            thread[i] = new Thread(() -> {
-                startGun.countDown();
-                safeAwait(startGun);
-                int docOffset;
-                while ((docOffset = offset.incrementAndGet()) < docs.size()) {
-                    try {
-                        engine.index(docs.get(docOffset));
-                    } catch (IOException e) {
-                        throw new AssertionError(e);
-                    }
+        startInParallel(randomIntBetween(3, 5), i -> {
+            int docOffset;
+            while ((docOffset = offset.incrementAndGet()) < docs.size()) {
+                try {
+                    engine.index(docs.get(docOffset));
+                } catch (IOException e) {
+                    throw new AssertionError(e);
                 }
-            });
-            thread[i].start();
-        }
-        for (int i = 0; i < thread.length; i++) {
-            thread[i].join();
-        }
+            }
+        });
         engine.refresh("test");
         try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
             int count = searcher.count(new MatchAllDocsQuery());

+ 14 - 31
server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java

@@ -196,46 +196,29 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
     }
 
     public void testConcurrentReplica() throws InterruptedException {
-        Thread[] threads = new Thread[randomIntBetween(2, 5)];
+        final int threads = randomIntBetween(2, 5);
         final int opsPerThread = randomIntBetween(10, 20);
-        final int maxOps = opsPerThread * threads.length;
+        final int maxOps = opsPerThread * threads;
         final long unFinishedSeq = randomIntBetween(0, maxOps - 2); // make sure we always index the last seqNo to simplify maxSeq checks
         Set<Integer> seqNos = IntStream.range(0, maxOps).boxed().collect(Collectors.toSet());
 
-        final Integer[][] seqNoPerThread = new Integer[threads.length][];
-        for (int t = 0; t < threads.length - 1; t++) {
+        final Integer[][] seqNoPerThread = new Integer[threads][];
+        for (int t = 0; t < threads - 1; t++) {
             int size = Math.min(seqNos.size(), randomIntBetween(opsPerThread - 4, opsPerThread + 4));
             seqNoPerThread[t] = randomSubsetOf(size, seqNos).toArray(new Integer[size]);
             seqNos.removeAll(Arrays.asList(seqNoPerThread[t]));
         }
-        seqNoPerThread[threads.length - 1] = seqNos.toArray(new Integer[seqNos.size()]);
-        logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads.length, maxOps, unFinishedSeq);
-        final CyclicBarrier barrier = new CyclicBarrier(threads.length);
-        for (int t = 0; t < threads.length; t++) {
-            final int threadId = t;
-            threads[t] = new Thread(new AbstractRunnable() {
-                @Override
-                public void onFailure(Exception e) {
-                    throw new ElasticsearchException("failure in background thread", e);
+        seqNoPerThread[threads - 1] = seqNos.toArray(new Integer[seqNos.size()]);
+        logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads, maxOps, unFinishedSeq);
+        startInParallel(threads, threadId -> {
+            Integer[] ops = seqNoPerThread[threadId];
+            for (int seqNo : ops) {
+                if (seqNo != unFinishedSeq) {
+                    tracker.markSeqNoAsProcessed(seqNo);
+                    logger.info("[t{}] completed [{}]", threadId, seqNo);
                 }
-
-                @Override
-                protected void doRun() throws Exception {
-                    barrier.await();
-                    Integer[] ops = seqNoPerThread[threadId];
-                    for (int seqNo : ops) {
-                        if (seqNo != unFinishedSeq) {
-                            tracker.markSeqNoAsProcessed(seqNo);
-                            logger.info("[t{}] completed [{}]", threadId, seqNo);
-                        }
-                    }
-                }
-            }, "testConcurrentReplica_" + threadId);
-            threads[t].start();
-        }
-        for (Thread thread : threads) {
-            thread.join();
-        }
+            }
+        });
         assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L));
         assertThat(tracker.getProcessedCheckpoint(), equalTo(unFinishedSeq - 1L));
         assertThat(tracker.hasProcessed(unFinishedSeq), equalTo(false));

+ 9 - 29
server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

@@ -29,7 +29,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -687,7 +686,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
      *
      * @throws IOException if an I/O exception occurs loading the retention lease state file
      */
-    public void testPersistRetentionLeasesUnderConcurrency() throws IOException {
+    public void testPersistRetentionLeasesUnderConcurrency() throws IOException, InterruptedException {
         final AllocationId allocationId = AllocationId.newInitializing();
         long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
         final ReplicationTracker replicationTracker = new ReplicationTracker(
@@ -719,35 +718,16 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
 
         final Path path = createTempDir();
         final int numberOfThreads = randomIntBetween(1, 2 * Runtime.getRuntime().availableProcessors());
-        final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
-        final Thread[] threads = new Thread[numberOfThreads];
-        for (int i = 0; i < numberOfThreads; i++) {
+        startInParallel(numberOfThreads, i -> {
             final String id = Integer.toString(length + i);
-            threads[i] = new Thread(() -> {
-                try {
-                    safeAwait(barrier);
-                    final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
-                    replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test-" + id, ActionListener.noop());
-                    replicationTracker.persistRetentionLeases(path);
-                    safeAwait(barrier);
-                } catch (final WriteStateException e) {
-                    throw new AssertionError(e);
-                }
-            });
-            threads[i].start();
-        }
-
-        try {
-            // synchronize the threads invoking ReplicationTracker#persistRetentionLeases(Path path)
-            safeAwait(barrier);
-            // wait for all the threads to finish
-            safeAwait(barrier);
-            for (int i = 0; i < numberOfThreads; i++) {
-                threads[i].join();
+            final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
+            replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test-" + id, ActionListener.noop());
+            try {
+                replicationTracker.persistRetentionLeases(path);
+            } catch (WriteStateException e) {
+                throw new AssertionError(e);
             }
-        } catch (final InterruptedException e) {
-            throw new AssertionError(e);
-        }
+        });
         assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
     }
 

+ 32 - 69
server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java

@@ -56,7 +56,6 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
     public void testThreadedUpdatesToChildBreaker() throws Exception {
         final int NUM_THREADS = scaledRandomIntBetween(3, 15);
         final int BYTES_PER_THREAD = scaledRandomIntBetween(500, 4500);
-        final Thread[] threads = new Thread[NUM_THREADS];
         final AtomicBoolean tripped = new AtomicBoolean(false);
         final AtomicReference<Throwable> lastException = new AtomicReference<>(null);
 
@@ -87,31 +86,21 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
             CircuitBreaker.REQUEST
         );
         breakerRef.set(breaker);
-
-        for (int i = 0; i < NUM_THREADS; i++) {
-            threads[i] = new Thread(() -> {
-                for (int j = 0; j < BYTES_PER_THREAD; j++) {
-                    try {
-                        breaker.addEstimateBytesAndMaybeBreak(1L, "test");
-                    } catch (CircuitBreakingException e) {
-                        if (tripped.get()) {
-                            assertThat("tripped too many times", true, equalTo(false));
-                        } else {
-                            assertThat(tripped.compareAndSet(false, true), equalTo(true));
-                        }
-                    } catch (Exception e) {
-                        lastException.set(e);
+        runInParallel(NUM_THREADS, i -> {
+            for (int j = 0; j < BYTES_PER_THREAD; j++) {
+                try {
+                    breaker.addEstimateBytesAndMaybeBreak(1L, "test");
+                } catch (CircuitBreakingException e) {
+                    if (tripped.get()) {
+                        assertThat("tripped too many times", true, equalTo(false));
+                    } else {
+                        assertThat(tripped.compareAndSet(false, true), equalTo(true));
                     }
+                } catch (Exception e) {
+                    lastException.set(e);
                 }
-            });
-
-            threads[i].start();
-        }
-
-        for (Thread t : threads) {
-            t.join();
-        }
-
+            }
+        });
         assertThat("no other exceptions were thrown", lastException.get(), equalTo(null));
         assertThat("breaker was tripped", tripped.get(), equalTo(true));
         assertThat("breaker was tripped at least once", breaker.getTrippedCount(), greaterThanOrEqualTo(1L));
@@ -122,7 +111,6 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
         final int BYTES_PER_THREAD = scaledRandomIntBetween(500, 4500);
         final int parentLimit = (BYTES_PER_THREAD * NUM_THREADS) - 2;
         final int childLimit = parentLimit + 10;
-        final Thread[] threads = new Thread[NUM_THREADS];
         final AtomicInteger tripped = new AtomicInteger(0);
         final AtomicReference<Throwable> lastException = new AtomicReference<>(null);
 
@@ -165,21 +153,6 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
             CircuitBreaker.REQUEST
         );
         breakerRef.set(breaker);
-
-        for (int i = 0; i < NUM_THREADS; i++) {
-            threads[i] = new Thread(() -> {
-                for (int j = 0; j < BYTES_PER_THREAD; j++) {
-                    try {
-                        breaker.addEstimateBytesAndMaybeBreak(1L, "test");
-                    } catch (CircuitBreakingException e) {
-                        tripped.incrementAndGet();
-                    } catch (Exception e) {
-                        lastException.set(e);
-                    }
-                }
-            });
-        }
-
         logger.info(
             "--> NUM_THREADS: [{}], BYTES_PER_THREAD: [{}], TOTAL_BYTES: [{}], PARENT_LIMIT: [{}], CHILD_LIMIT: [{}]",
             NUM_THREADS,
@@ -190,13 +163,17 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
         );
 
         logger.info("--> starting threads...");
-        for (Thread t : threads) {
-            t.start();
-        }
-
-        for (Thread t : threads) {
-            t.join();
-        }
+        runInParallel(NUM_THREADS, i -> {
+            for (int j = 0; j < BYTES_PER_THREAD; j++) {
+                try {
+                    breaker.addEstimateBytesAndMaybeBreak(1L, "test");
+                } catch (CircuitBreakingException e) {
+                    tripped.incrementAndGet();
+                } catch (Exception e) {
+                    lastException.set(e);
+                }
+            }
+        });
 
         logger.info("--> child breaker: used: {}, limit: {}", breaker.getUsed(), breaker.getLimit());
         logger.info("--> parent tripped: {}, total trip count: {} (expecting 1-2 for each)", parentTripped.get(), tripped.get());
@@ -401,29 +378,15 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
         });
 
         logger.trace("black hole [{}]", data.hashCode());
-
         int threadCount = randomIntBetween(1, 10);
-        CyclicBarrier barrier = new CyclicBarrier(threadCount + 1);
-        List<Thread> threads = new ArrayList<>(threadCount);
-        for (int i = 0; i < threadCount; ++i) {
-            threads.add(new Thread(() -> {
-                try {
-                    safeAwait(barrier);
-                    service.checkParentLimit(0, "test-thread");
-                } catch (CircuitBreakingException e) {
-                    // very rare
-                    logger.info("Thread got semi-unexpected circuit breaking exception", e);
-                }
-            }));
-        }
-
-        threads.forEach(Thread::start);
-        barrier.await(20, TimeUnit.SECONDS);
-
-        for (Thread thread : threads) {
-            thread.join(10000);
-        }
-        threads.forEach(thread -> assertFalse(thread.isAlive()));
+        startInParallel(threadCount, i -> {
+            try {
+                service.checkParentLimit(0, "test-thread");
+            } catch (CircuitBreakingException e) {
+                // very rare
+                logger.info("Thread got semi-unexpected circuit breaking exception", e);
+            }
+        });
 
         assertThat(leaderTriggerCount.get(), equalTo(2));
     }