Selaa lähdekoodia

= fix BooleanMutex's inner Sync to `static` class and add test case

zavakid 4 vuotta sitten
vanhempi
commit
5065fea963

+ 3 - 3
common/src/main/java/com/alibaba/otter/canal/common/utils/BooleanMutex.java

@@ -32,7 +32,7 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  */
 public class BooleanMutex {
 
-    private Sync sync;
+    private final Sync sync;
 
     public BooleanMutex(){
         sync = new Sync();
@@ -47,7 +47,7 @@ public class BooleanMutex {
     /**
      * 阻塞等待Boolean为true
      * 
-     * @throws InterruptedException
+     * @throws InterruptedException if the current thread is interrupted
      */
     public void get() throws InterruptedException {
         sync.innerGet();
@@ -86,7 +86,7 @@ public class BooleanMutex {
      * Synchronization control for BooleanMutex. Uses AQS sync state to
      * represent run status
      */
-    private final class Sync extends AbstractQueuedSynchronizer {
+    private static final class Sync extends AbstractQueuedSynchronizer {
 
         private static final long serialVersionUID = 2559471934544126329L;
         /** State value representing that TRUE */

+ 117 - 0
common/src/test/java/com/alibaba/otter/canal/common/utils/BooleanMutexTest.java

@@ -0,0 +1,117 @@
+package com.alibaba.otter.canal.common.utils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.*;
+
+public class BooleanMutexTest {
+
+    public static final int CONCURRENCY = 10;
+    private ExecutorService executorService;
+
+    @Before
+    public void setUp() {
+        executorService = Executors.newFixedThreadPool(CONCURRENCY);
+    }
+
+    @After
+    public void tearDown() {
+        if (executorService != null) {
+            executorService.shutdownNow();
+        }
+    }
+
+    @Test(timeout = 3000L)
+    public void testBooleanMutexGet() throws Exception {
+
+        BooleanMutex mutex = new BooleanMutex();
+
+        AtomicLong atomicLong = new AtomicLong(0);
+
+        Phaser phaser = new Phaser(CONCURRENCY + 1);
+
+        for (int i = 0; i < CONCURRENCY; i++) {
+            executorService.submit(() -> {
+                try {
+                    // arrive phase1 and wait until phase1 finish
+                    int phase1 = phaser.arrive();
+                    phaser.awaitAdvanceInterruptibly(phase1);
+
+                    mutex.get();
+                    atomicLong.addAndGet(1);
+
+                    // arrive phase2
+                    phaser.arrive();
+                } catch (InterruptedException e) {
+                    throw new IllegalStateException(e);
+                }
+            });
+        }
+
+        assertEquals(0, atomicLong.get());
+
+        // arrive phase1 and wait until phase1 finish
+        int phase1 = phaser.arrive();
+        phaser.awaitAdvanceInterruptibly(phase1);
+        assertEquals(0, atomicLong.get());
+
+        mutex.set(true);
+
+        // arrive phase2 and wait until phase2 finish
+        int phase2 = phaser.arrive();
+        phaser.awaitAdvanceInterruptibly(phase2);
+        assertEquals(CONCURRENCY, atomicLong.get());
+    }
+
+
+    @Test(timeout = 30000L, expected = TimeoutException.class)
+    public void testBooleanMutexBlock() throws Exception {
+
+        BooleanMutex mutex = new BooleanMutex();
+
+        AtomicLong atomicLong = new AtomicLong(0);
+
+        Phaser phaser = new Phaser(CONCURRENCY + 1);
+
+        for (int i = 0; i < CONCURRENCY; i++) {
+            executorService.submit(() -> {
+                try {
+                    // arrive phase1 and wait until phase1 finish
+                    int phase1 = phaser.arrive();
+                    phaser.awaitAdvanceInterruptibly(phase1);
+
+                    mutex.get();
+                    atomicLong.addAndGet(1);
+
+                    // arrive phase2
+                    phaser.arrive();
+                } catch (InterruptedException e) {
+                    throw new IllegalStateException(e);
+                }
+            });
+        }
+
+        assertEquals(0, atomicLong.get());
+
+        // arrive phase1 and wait until phase1 finish
+        int phase1 = phaser.arrive();
+        phaser.awaitAdvanceInterruptibly(phase1);
+        assertEquals(0, atomicLong.get());
+
+        // mutex is still false
+        mutex.set(false);
+
+
+        // arrive phase2 and wait until phase2 finish
+        int phase2 = phaser.arrive();
+        // will throw interrupted exception when timeout because mutex is still false
+        phaser.awaitAdvanceInterruptibly(phase2, 2, TimeUnit.SECONDS);
+
+        fail("unreachable code");
+    }
+}