|
@@ -67,6 +67,15 @@ import static org.mockito.Mockito.verify;
|
|
|
|
|
|
public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
|
|
|
+ @FunctionalInterface
|
|
|
+ interface TestGlobalCheckpointListener extends GlobalCheckpointListeners.GlobalCheckpointListener {
|
|
|
+
|
|
|
+ default Executor executor() {
|
|
|
+ return Runnable::run;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
private final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
|
|
|
private final ScheduledThreadPoolExecutor scheduler =
|
|
|
new Scheduler.SafeScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(Settings.EMPTY, "scheduler"));
|
|
@@ -78,17 +87,19 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
|
|
|
public void testGlobalCheckpointUpdated() throws IOException {
|
|
|
final GlobalCheckpointListeners globalCheckpointListeners =
|
|
|
- new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
|
|
|
+ new GlobalCheckpointListeners(shardId, scheduler, logger);
|
|
|
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
|
|
|
final int numberOfListeners = randomIntBetween(0, 64);
|
|
|
final Map<GlobalCheckpointListeners.GlobalCheckpointListener, Long> listeners = new HashMap<>();
|
|
|
final Map<GlobalCheckpointListeners.GlobalCheckpointListener, Long> notifiedListeners = new HashMap<>();
|
|
|
for (int i = 0; i < numberOfListeners; i++) {
|
|
|
- final GlobalCheckpointListeners.GlobalCheckpointListener listener = new GlobalCheckpointListeners.GlobalCheckpointListener() {
|
|
|
+ final TestGlobalCheckpointListener listener = new TestGlobalCheckpointListener() {
|
|
|
+
|
|
|
@Override
|
|
|
public void accept(final long g, final Exception e) {
|
|
|
notifiedListeners.put(this, g);
|
|
|
}
|
|
|
+
|
|
|
};
|
|
|
final long waitingGlobalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE);
|
|
|
listeners.put(listener, waitingGlobalCheckpoint);
|
|
@@ -133,7 +144,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
|
|
|
public void testListenersReadyToBeNotified() throws IOException {
|
|
|
final GlobalCheckpointListeners globalCheckpointListeners =
|
|
|
- new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
|
|
|
+ new GlobalCheckpointListeners(shardId, scheduler, logger);
|
|
|
final long globalCheckpoint = randomLongBetween(0, Long.MAX_VALUE);
|
|
|
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
|
|
|
final int numberOfListeners = randomIntBetween(0, 16);
|
|
@@ -165,7 +176,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
public void testFailingListenerReadyToBeNotified() {
|
|
|
final Logger mockLogger = mock(Logger.class);
|
|
|
final GlobalCheckpointListeners globalCheckpointListeners =
|
|
|
- new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
|
|
|
+ new GlobalCheckpointListeners(shardId, scheduler, mockLogger);
|
|
|
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE);
|
|
|
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
|
|
|
final int numberOfListeners = randomIntBetween(0, 16);
|
|
@@ -207,7 +218,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
|
|
|
public void testClose() throws IOException {
|
|
|
final GlobalCheckpointListeners globalCheckpointListeners =
|
|
|
- new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
|
|
|
+ new GlobalCheckpointListeners(shardId, scheduler, logger);
|
|
|
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
|
|
|
final int numberOfListeners = randomIntBetween(0, 16);
|
|
|
final Exception[] exceptions = new Exception[numberOfListeners];
|
|
@@ -235,7 +246,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
|
|
|
public void testAddAfterClose() throws InterruptedException, IOException {
|
|
|
final GlobalCheckpointListeners globalCheckpointListeners =
|
|
|
- new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
|
|
|
+ new GlobalCheckpointListeners(shardId, scheduler, logger);
|
|
|
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
|
|
|
globalCheckpointListeners.close();
|
|
|
final AtomicBoolean invoked = new AtomicBoolean();
|
|
@@ -254,7 +265,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
public void testFailingListenerOnUpdate() {
|
|
|
final Logger mockLogger = mock(Logger.class);
|
|
|
final GlobalCheckpointListeners globalCheckpointListeners =
|
|
|
- new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
|
|
|
+ new GlobalCheckpointListeners(shardId, scheduler, mockLogger);
|
|
|
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
|
|
|
final int numberOfListeners = randomIntBetween(0, 16);
|
|
|
final boolean[] failures = new boolean[numberOfListeners];
|
|
@@ -308,7 +319,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
public void testFailingListenerOnClose() throws IOException {
|
|
|
final Logger mockLogger = mock(Logger.class);
|
|
|
final GlobalCheckpointListeners globalCheckpointListeners =
|
|
|
- new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
|
|
|
+ new GlobalCheckpointListeners(shardId, scheduler, mockLogger);
|
|
|
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
|
|
|
final int numberOfListeners = randomIntBetween(0, 16);
|
|
|
final boolean[] failures = new boolean[numberOfListeners];
|
|
@@ -360,24 +371,35 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
count.incrementAndGet();
|
|
|
command.run();
|
|
|
};
|
|
|
- final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
|
|
|
+ final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger);
|
|
|
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
|
|
|
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE);
|
|
|
final AtomicInteger notified = new AtomicInteger();
|
|
|
final int numberOfListeners = randomIntBetween(0, 16);
|
|
|
for (int i = 0; i < numberOfListeners; i++) {
|
|
|
globalCheckpointListeners.add(
|
|
|
- 0,
|
|
|
- maybeMultipleInvocationProtectingListener((g, e) -> {
|
|
|
- notified.incrementAndGet();
|
|
|
- assertThat(g, equalTo(globalCheckpoint));
|
|
|
- assertNull(e);
|
|
|
+ 0,
|
|
|
+ maybeMultipleInvocationProtectingListener(
|
|
|
+ new TestGlobalCheckpointListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Executor executor() {
|
|
|
+ return executor;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(final long g, final Exception e) {
|
|
|
+ notified.incrementAndGet();
|
|
|
+ assertThat(g, equalTo(globalCheckpoint));
|
|
|
+ assertNull(e);
|
|
|
+ }
|
|
|
+
|
|
|
}),
|
|
|
- null);
|
|
|
+ null);
|
|
|
}
|
|
|
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
|
|
|
assertThat(notified.get(), equalTo(numberOfListeners));
|
|
|
- assertThat(count.get(), equalTo(numberOfListeners == 0 ? 0 : 1));
|
|
|
+ assertThat(count.get(), equalTo(numberOfListeners));
|
|
|
}
|
|
|
|
|
|
public void testNotificationOnClosedUsesExecutor() throws IOException {
|
|
@@ -386,21 +408,32 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
count.incrementAndGet();
|
|
|
command.run();
|
|
|
};
|
|
|
- final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
|
|
|
+ final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger);
|
|
|
globalCheckpointListeners.close();
|
|
|
final AtomicInteger notified = new AtomicInteger();
|
|
|
final int numberOfListeners = randomIntBetween(0, 16);
|
|
|
for (int i = 0; i < numberOfListeners; i++) {
|
|
|
globalCheckpointListeners.add(
|
|
|
- NO_OPS_PERFORMED,
|
|
|
- maybeMultipleInvocationProtectingListener((g, e) -> {
|
|
|
- notified.incrementAndGet();
|
|
|
- assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
|
|
|
- assertNotNull(e);
|
|
|
- assertThat(e, instanceOf(IndexShardClosedException.class));
|
|
|
- assertThat(((IndexShardClosedException) e).getShardId(), equalTo(shardId));
|
|
|
+ NO_OPS_PERFORMED,
|
|
|
+ maybeMultipleInvocationProtectingListener(
|
|
|
+ new TestGlobalCheckpointListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Executor executor() {
|
|
|
+ return executor;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(final long g, final Exception e) {
|
|
|
+ notified.incrementAndGet();
|
|
|
+ assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
|
|
|
+ assertNotNull(e);
|
|
|
+ assertThat(e, instanceOf(IndexShardClosedException.class));
|
|
|
+ assertThat(((IndexShardClosedException) e).getShardId(), equalTo(shardId));
|
|
|
+ }
|
|
|
+
|
|
|
}),
|
|
|
- null);
|
|
|
+ null);
|
|
|
}
|
|
|
assertThat(notified.get(), equalTo(numberOfListeners));
|
|
|
assertThat(count.get(), equalTo(numberOfListeners));
|
|
@@ -412,20 +445,30 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
count.incrementAndGet();
|
|
|
command.run();
|
|
|
};
|
|
|
- final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
|
|
|
+ final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger);
|
|
|
final long globalCheckpoint = randomNonNegativeLong();
|
|
|
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint);
|
|
|
final AtomicInteger notified = new AtomicInteger();
|
|
|
final int numberOfListeners = randomIntBetween(0, 16);
|
|
|
for (int i = 0; i < numberOfListeners; i++) {
|
|
|
globalCheckpointListeners.add(
|
|
|
- randomLongBetween(0, globalCheckpoint),
|
|
|
- maybeMultipleInvocationProtectingListener((g, e) -> {
|
|
|
- notified.incrementAndGet();
|
|
|
- assertThat(g, equalTo(globalCheckpoint));
|
|
|
- assertNull(e);
|
|
|
+ randomLongBetween(0, globalCheckpoint),
|
|
|
+ maybeMultipleInvocationProtectingListener(
|
|
|
+ new TestGlobalCheckpointListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Executor executor() {
|
|
|
+ return executor;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(final long g, final Exception e) {
|
|
|
+ notified.incrementAndGet();
|
|
|
+ assertThat(g, equalTo(globalCheckpoint));
|
|
|
+ assertNull(e);
|
|
|
+ }
|
|
|
}),
|
|
|
- null);
|
|
|
+ null);
|
|
|
}
|
|
|
assertThat(notified.get(), equalTo(numberOfListeners));
|
|
|
assertThat(count.get(), equalTo(numberOfListeners));
|
|
@@ -433,7 +476,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
|
|
|
public void testConcurrency() throws Exception {
|
|
|
final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8));
|
|
|
- final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
|
|
|
+ final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger);
|
|
|
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
|
|
|
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get());
|
|
|
// we are going to synchronize the actions of three threads: the updating thread, the listener thread, and the main test thread
|
|
@@ -469,14 +512,24 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
invocations.add(invocation);
|
|
|
// sometimes this will notify the listener immediately
|
|
|
globalCheckpointListeners.add(
|
|
|
- globalCheckpoint.get(),
|
|
|
- maybeMultipleInvocationProtectingListener(
|
|
|
- (g, e) -> {
|
|
|
- if (invocation.compareAndSet(false, true) == false) {
|
|
|
- throw new IllegalStateException("listener invoked twice");
|
|
|
- }
|
|
|
- }),
|
|
|
- randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1))));
|
|
|
+ globalCheckpoint.get(),
|
|
|
+ maybeMultipleInvocationProtectingListener(
|
|
|
+ new TestGlobalCheckpointListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Executor executor() {
|
|
|
+ return executor;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(final long g, final Exception e) {
|
|
|
+ if (invocation.compareAndSet(false, true) == false) {
|
|
|
+ throw new IllegalStateException("listener invoked twice");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }),
|
|
|
+ randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1))));
|
|
|
}
|
|
|
// synchronize ending with the updating thread and the main test thread
|
|
|
awaitQuietly(barrier);
|
|
@@ -506,7 +559,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
public void testTimeout() throws InterruptedException {
|
|
|
final Logger mockLogger = mock(Logger.class);
|
|
|
final GlobalCheckpointListeners globalCheckpointListeners =
|
|
|
- new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
|
|
|
+ new GlobalCheckpointListeners(shardId, scheduler, mockLogger);
|
|
|
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
|
|
|
final AtomicBoolean notified = new AtomicBoolean();
|
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
@@ -541,22 +594,33 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
count.incrementAndGet();
|
|
|
command.run();
|
|
|
};
|
|
|
- final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger);
|
|
|
+ final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, scheduler, logger);
|
|
|
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
|
|
|
final AtomicBoolean notified = new AtomicBoolean();
|
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
|
globalCheckpointListeners.add(
|
|
|
- NO_OPS_PERFORMED,
|
|
|
- maybeMultipleInvocationProtectingListener((g, e) -> {
|
|
|
- try {
|
|
|
- notified.set(true);
|
|
|
- assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
|
|
|
- assertThat(e, instanceOf(TimeoutException.class));
|
|
|
- } finally {
|
|
|
- latch.countDown();
|
|
|
+ NO_OPS_PERFORMED,
|
|
|
+ maybeMultipleInvocationProtectingListener(
|
|
|
+ new TestGlobalCheckpointListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Executor executor() {
|
|
|
+ return executor;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(final long g, final Exception e) {
|
|
|
+ try {
|
|
|
+ notified.set(true);
|
|
|
+ assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
|
|
|
+ assertThat(e, instanceOf(TimeoutException.class));
|
|
|
+ } finally {
|
|
|
+ latch.countDown();
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
}),
|
|
|
- timeout);
|
|
|
+ timeout);
|
|
|
latch.await();
|
|
|
// ensure the listener notification occurred on the executor
|
|
|
assertTrue(notified.get());
|
|
@@ -571,7 +635,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
return null;
|
|
|
}).when(mockLogger).warn(argThat(any(String.class)), argThat(any(RuntimeException.class)));
|
|
|
final GlobalCheckpointListeners globalCheckpointListeners =
|
|
|
- new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger);
|
|
|
+ new GlobalCheckpointListeners(shardId, scheduler, mockLogger);
|
|
|
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50));
|
|
|
globalCheckpointListeners.add(
|
|
|
NO_OPS_PERFORMED,
|
|
@@ -591,7 +655,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
|
|
|
public void testTimeoutCancelledAfterListenerNotified() {
|
|
|
final GlobalCheckpointListeners globalCheckpointListeners =
|
|
|
- new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger);
|
|
|
+ new GlobalCheckpointListeners(shardId, scheduler, logger);
|
|
|
final TimeValue timeout = TimeValue.timeValueNanos(Long.MAX_VALUE);
|
|
|
final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener =
|
|
|
maybeMultipleInvocationProtectingListener((g, e) -> {
|
|
@@ -606,14 +670,24 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
private GlobalCheckpointListeners.GlobalCheckpointListener maybeMultipleInvocationProtectingListener(
|
|
|
- final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener) {
|
|
|
+ final TestGlobalCheckpointListener globalCheckpointListener) {
|
|
|
if (Assertions.ENABLED) {
|
|
|
final AtomicBoolean invoked = new AtomicBoolean();
|
|
|
- return (g, e) -> {
|
|
|
- if (invoked.compareAndSet(false, true) == false) {
|
|
|
- throw new AssertionError("listener invoked twice");
|
|
|
+ return new GlobalCheckpointListeners.GlobalCheckpointListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Executor executor() {
|
|
|
+ return globalCheckpointListener.executor();
|
|
|
}
|
|
|
- globalCheckpointListener.accept(g, e);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(final long g, final Exception e) {
|
|
|
+ if (invoked.compareAndSet(false, true) == false) {
|
|
|
+ throw new AssertionError("listener invoked twice");
|
|
|
+ }
|
|
|
+ globalCheckpointListener.accept(g, e);
|
|
|
+ }
|
|
|
+
|
|
|
};
|
|
|
} else {
|
|
|
return globalCheckpointListener;
|