Browse Source

Merge pull request #15801 from jasontedor/cyclic-barriers-for-boaz

Use CyclicBarriers for sychronizing driver and test threads
Jason Tedor 9 years ago
parent
commit
a583edb2df

+ 10 - 15
core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java

@@ -886,7 +886,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
         }
     }
 
-    public void testClusterStateBatchedUpdates() throws InterruptedException {
+    public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException {
         Settings settings = settingsBuilder()
                 .put("discovery.type", "local")
                 .build();
@@ -974,19 +974,12 @@ public class ClusterServiceIT extends ESIntegTestCase {
             counts.merge(executor, 1, (previous, one) -> previous + one);
         }
 
-        CountDownLatch startGate = new CountDownLatch(1);
-        CountDownLatch endGate = new CountDownLatch(numberOfThreads);
-        AtomicBoolean interrupted = new AtomicBoolean();
+        CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
         for (int i = 0; i < numberOfThreads; i++) {
             final int index = i;
             Thread thread = new Thread(() -> {
                 try {
-                    try {
-                        startGate.await();
-                    } catch (InterruptedException e) {
-                        interrupted.set(true);
-                        return;
-                    }
+                    barrier.await();
                     for (int j = 0; j < tasksSubmittedPerThread; j++) {
                         ClusterStateTaskExecutor<Task> executor = assignments.get(index * tasksSubmittedPerThread + j);
                         clusterService.submitStateUpdateTask(
@@ -996,16 +989,18 @@ public class ClusterServiceIT extends ESIntegTestCase {
                                 executor,
                                 listener);
                     }
-                } finally {
-                    endGate.countDown();
+                    barrier.await();
+                } catch (BrokenBarrierException | InterruptedException e) {
+                    throw new AssertionError(e);
                 }
             });
             thread.start();
         }
 
-        startGate.countDown();
-        endGate.await();
-        assertFalse(interrupted.get());
+        // wait for all threads to be ready
+        barrier.await();
+        // wait for all threads to finish
+        barrier.await();
 
         // wait until all the cluster state updates have been processed
         updateLatch.await();

+ 60 - 56
core/src/test/java/org/elasticsearch/common/cache/CacheTests.java

@@ -31,7 +31,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -42,6 +45,8 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
 
 public class CacheTests extends ESTestCase {
     private int numberOfEntries;
@@ -483,7 +488,7 @@ public class CacheTests extends ESTestCase {
                     return value;
                 });
             } catch (ExecutionException e) {
-                fail(e.getMessage());
+                throw new AssertionError(e);
             }
         }
         for (int i = 0; i < numberOfEntries; i++) {
@@ -491,25 +496,21 @@ public class CacheTests extends ESTestCase {
         }
     }
 
-    public void testComputeIfAbsentCallsOnce() throws InterruptedException {
+    public void testComputeIfAbsentCallsOnce() throws BrokenBarrierException, InterruptedException {
         int numberOfThreads = randomIntBetween(2, 32);
         final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
         AtomicReferenceArray flags = new AtomicReferenceArray(numberOfEntries);
         for (int j = 0; j < numberOfEntries; j++) {
             flags.set(j, false);
         }
-        CountDownLatch startGate = new CountDownLatch(1);
-        CountDownLatch endGate = new CountDownLatch(numberOfThreads);
-        AtomicBoolean interrupted = new AtomicBoolean();
+
+        CopyOnWriteArrayList<ExecutionException> failures = new CopyOnWriteArrayList<>();
+
+        CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
         for (int i = 0; i < numberOfThreads; i++) {
             Thread thread = new Thread(() -> {
                 try {
-                    try {
-                        startGate.await();
-                    } catch (InterruptedException e) {
-                        interrupted.set(true);
-                        return;
-                    }
+                    barrier.await();
                     for (int j = 0; j < numberOfEntries; j++) {
                         try {
                             cache.computeIfAbsent(j, key -> {
@@ -517,18 +518,24 @@ public class CacheTests extends ESTestCase {
                                 return Integer.toString(key);
                             });
                         } catch (ExecutionException e) {
-                            throw new RuntimeException(e);
+                            failures.add(e);
+                            break;
                         }
                     }
-                } finally {
-                    endGate.countDown();
+                    barrier.await();
+                } catch (BrokenBarrierException | InterruptedException e) {
+                    throw new AssertionError(e);
                 }
             });
             thread.start();
         }
-        startGate.countDown();
-        endGate.await();
-        assertFalse(interrupted.get());
+
+        // wait for all threads to be ready
+        barrier.await();
+        // wait for all threads to finish
+        barrier.await();
+
+        assertThat(failures, is(empty()));
     }
 
     public void testComputeIfAbsentThrowsExceptionIfLoaderReturnsANullValue() {
@@ -541,7 +548,7 @@ public class CacheTests extends ESTestCase {
         }
     }
 
-    public void testDependentKeyDeadlock() throws InterruptedException {
+    public void testDependentKeyDeadlock() throws BrokenBarrierException, InterruptedException {
         class Key {
             private final int key;
 
@@ -568,18 +575,19 @@ public class CacheTests extends ESTestCase {
 
         int numberOfThreads = randomIntBetween(2, 32);
         final Cache<Key, Integer> cache = CacheBuilder.<Key, Integer>builder().build();
-        CountDownLatch startGate = new CountDownLatch(1);
+
+        CopyOnWriteArrayList<ExecutionException> failures = new CopyOnWriteArrayList<>();
+
+        CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
         CountDownLatch deadlockLatch = new CountDownLatch(numberOfThreads);
-        AtomicBoolean interrupted = new AtomicBoolean();
         List<Thread> threads = new ArrayList<>();
         for (int i = 0; i < numberOfThreads; i++) {
             Thread thread = new Thread(() -> {
                 try {
                     try {
-                        startGate.await();
-                    } catch (InterruptedException e) {
-                        interrupted.set(true);
-                        return;
+                        barrier.await();
+                    } catch (BrokenBarrierException | InterruptedException e) {
+                        throw new AssertionError(e);
                     }
                     Random random = new Random(random().nextLong());
                     for (int j = 0; j < numberOfEntries; j++) {
@@ -594,7 +602,8 @@ public class CacheTests extends ESTestCase {
                                 }
                             });
                         } catch (ExecutionException e) {
-                            fail(e.getMessage());
+                            failures.add(e);
+                            break;
                         }
                     }
                 } finally {
@@ -631,7 +640,7 @@ public class CacheTests extends ESTestCase {
         }, 1, 1, TimeUnit.SECONDS);
 
         // everything is setup, release the hounds
-        startGate.countDown();
+        barrier.await();
 
         // wait for either deadlock to be detected or the threads to terminate
         deadlockLatch.await();
@@ -639,24 +648,21 @@ public class CacheTests extends ESTestCase {
         // shutdown the watchdog service
         scheduler.shutdown();
 
+        assertThat(failures, is(empty()));
+
         assertFalse("deadlock", deadlock.get());
     }
 
-    public void testCachePollution() throws InterruptedException {
+    public void testCachePollution() throws BrokenBarrierException, InterruptedException {
         int numberOfThreads = randomIntBetween(2, 32);
         final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
-        CountDownLatch startGate = new CountDownLatch(1);
-        CountDownLatch endGate = new CountDownLatch(numberOfThreads);
-        AtomicBoolean interrupted = new AtomicBoolean();
+
+        CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
+
         for (int i = 0; i < numberOfThreads; i++) {
             Thread thread = new Thread(() -> {
                 try {
-                    try {
-                        startGate.await();
-                    } catch (InterruptedException e) {
-                        interrupted.set(true);
-                        return;
-                    }
+                    barrier.await();
                     Random random = new Random(random().nextLong());
                     for (int j = 0; j < numberOfEntries; j++) {
                         Integer key = random.nextInt(numberOfEntries);
@@ -686,21 +692,23 @@ public class CacheTests extends ESTestCase {
                             cache.get(key);
                         }
                     }
-                } finally {
-                    endGate.countDown();
+                    barrier.await();
+                } catch (BrokenBarrierException | InterruptedException e) {
+                    throw new AssertionError(e);
                 }
             });
             thread.start();
         }
 
-        startGate.countDown();
-        endGate.await();
-        assertFalse(interrupted.get());
+        // wait for all threads to be ready
+        barrier.await();
+        // wait for all threads to finish
+        barrier.await();
     }
 
     // test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key
     // here be dragons: this test did catch one subtle bug during development; do not remove lightly
-    public void testTorture() throws InterruptedException {
+    public void testTorture() throws BrokenBarrierException, InterruptedException {
         int numberOfThreads = randomIntBetween(2, 32);
         final Cache<Integer, String> cache =
                 CacheBuilder.<Integer, String>builder()
@@ -708,32 +716,28 @@ public class CacheTests extends ESTestCase {
                         .weigher((k, v) -> 2)
                         .build();
 
-        CountDownLatch startGate = new CountDownLatch(1);
-        CountDownLatch endGate = new CountDownLatch(numberOfThreads);
-        AtomicBoolean interrupted = new AtomicBoolean();
+        CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
         for (int i = 0; i < numberOfThreads; i++) {
             Thread thread = new Thread(() -> {
                 try {
-                    try {
-                        startGate.await();
-                    } catch (InterruptedException e) {
-                        interrupted.set(true);
-                        return;
-                    }
+                    barrier.await();
                     Random random = new Random(random().nextLong());
                     for (int j = 0; j < numberOfEntries; j++) {
                         Integer key = random.nextInt(numberOfEntries);
                         cache.put(key, Integer.toString(j));
                     }
-                } finally {
-                    endGate.countDown();
+                    barrier.await();
+                } catch (BrokenBarrierException | InterruptedException e) {
+                    throw new AssertionError(e);
                 }
             });
             thread.start();
         }
-        startGate.countDown();
-        endGate.await();
-        assertFalse(interrupted.get());
+
+        // wait for all threads to be ready
+        barrier.await();
+        // wait for all threads to finish
+        barrier.await();
 
         cache.refresh();
         assertEquals(500, cache.count());