|
@@ -30,6 +30,7 @@ import java.util.Collections;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
@@ -49,6 +50,7 @@ public abstract class ESSelector implements Closeable {
|
|
|
|
|
|
private final EventHandler eventHandler;
|
|
|
private final ReentrantLock runLock = new ReentrantLock();
|
|
|
+ private final CountDownLatch exitedLoop = new CountDownLatch(1);
|
|
|
private final AtomicBoolean isClosed = new AtomicBoolean(false);
|
|
|
private final PlainActionFuture<Boolean> isRunningFuture = PlainActionFuture.newFuture();
|
|
|
private final Set<NioChannel> registeredChannels = Collections.newSetFromMap(new ConcurrentHashMap<NioChannel, Boolean>());
|
|
@@ -64,8 +66,7 @@ public abstract class ESSelector implements Closeable {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Starts this selector. The selector will run until {@link #close()} or {@link #close(boolean)} is
|
|
|
- * called.
|
|
|
+ * Starts this selector. The selector will run until {@link #close()} is called.
|
|
|
*/
|
|
|
public void runLoop() {
|
|
|
if (runLock.tryLock()) {
|
|
@@ -85,6 +86,7 @@ public abstract class ESSelector implements Closeable {
|
|
|
eventHandler.closeSelectorException(e);
|
|
|
} finally {
|
|
|
runLock.unlock();
|
|
|
+ exitedLoop.countDown();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -156,17 +158,15 @@ public abstract class ESSelector implements Closeable {
|
|
|
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
|
- close(false);
|
|
|
- }
|
|
|
-
|
|
|
- public void close(boolean shouldInterrupt) throws IOException {
|
|
|
if (isClosed.compareAndSet(false, true)) {
|
|
|
- if (shouldInterrupt && thread != null) {
|
|
|
- thread.interrupt();
|
|
|
- } else {
|
|
|
- wakeup();
|
|
|
+ wakeup();
|
|
|
+ if (isRunning()) {
|
|
|
+ try {
|
|
|
+ exitedLoop.await();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ eventHandler.uncaughtException(e);
|
|
|
+ }
|
|
|
}
|
|
|
- runLock.lock(); // wait for the shutdown to complete
|
|
|
}
|
|
|
}
|
|
|
|