瀏覽代碼

Fix race condition in queue size test

The queue size test has a race condition. Namely the offering thread can
run so quickly completing all of its offering iterations before the
queue size thread ever has a chance to run a single size poll
iteration. This means that the size will never actually be polled and
the test can spuriously fail. What we really want to do here, since this
test is checking for a race condition between polling the size of the
queue and offers to the queue, we want to execute each iteration in
lockstep giving the threads multiple changes for the race between
polling the size and offers to occur. This commit addresses this by
running the two threads in lockstep for multiple iterations so that they
have multiple chances to race.

Relates #28584
Jason Tedor 7 年之前
父節點
當前提交
5badacf391
共有 1 個文件被更改,包括 26 次插入30 次删除
  1. 26 30
      server/src/test/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueueTests.java

+ 26 - 30
server/src/test/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueueTests.java

@@ -23,8 +23,8 @@ import org.elasticsearch.test.ESTestCase;
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -46,45 +46,41 @@ public class SizeBlockingQueueTests extends ESTestCase {
             sizeBlockingQueue.offer(i);
         }
 
-        final CountDownLatch latch = new CountDownLatch(1);
 
-        final AtomicBoolean spin = new AtomicBoolean(true);
-        final AtomicInteger maxSize = new AtomicInteger();
-
-        // this thread will repeatedly poll the size of the queue keeping track of the maximum size that it sees
-        final Thread queueSizeThread = new Thread(() -> {
-            try {
-                latch.await();
-            } catch (final InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-            while (spin.get()) {
-                maxSize.set(Math.max(maxSize.get(), sizeBlockingQueue.size()));
-            }
-        });
-        queueSizeThread.start();
+        final int iterations = 1 << 16;
+        final CyclicBarrier barrier = new CyclicBarrier(2);
 
         // this thread will try to offer items to the queue while the queue size thread is polling the size
         final Thread queueOfferThread = new Thread(() -> {
-            try {
-                latch.await();
-            } catch (final InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-            for (int i = 0; i < 4096; i++) {
+            for (int i = 0; i < iterations; i++) {
+                try {
+                    // synchronize each iteration of checking the size with each iteration of offering, each iteration is a race
+                    barrier.await();
+                } catch (final BrokenBarrierException | InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
                 sizeBlockingQueue.offer(capacity + i);
             }
         });
         queueOfferThread.start();
 
-        // synchronize the start of the two threads
-        latch.countDown();
+        // this thread will repeatedly poll the size of the queue keeping track of the maximum size that it sees
+        final AtomicInteger maxSize = new AtomicInteger();
+        final Thread queueSizeThread = new Thread(() -> {
+            for (int i = 0; i < iterations; i++) {
+                try {
+                    // synchronize each iteration of checking the size with each iteration of offering, each iteration is a race
+                    barrier.await();
+                } catch (final BrokenBarrierException | InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                maxSize.set(Math.max(maxSize.get(), sizeBlockingQueue.size()));
+            }
+        });
+        queueSizeThread.start();
 
-        // wait for the offering thread to finish
+        // wait for the threads to finish
         queueOfferThread.join();
-
-        // stop the queue size thread
-        spin.set(false);
         queueSizeThread.join();
 
         // the maximum size of the queue should be equal to the capacity