|
@@ -22,17 +22,15 @@ import org.junit.After;
|
|
|
import org.junit.Before;
|
|
|
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
-import java.util.concurrent.CyclicBarrier;
|
|
|
import java.util.concurrent.Executor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.TimeoutException;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
|
-import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.instanceOf;
|
|
|
+import static org.hamcrest.Matchers.oneOf;
|
|
|
import static org.hamcrest.Matchers.sameInstance;
|
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
|
import static org.mockito.ArgumentMatchers.same;
|
|
@@ -275,49 +273,28 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
|
|
|
assertTrue(reschedulingRunnable.isCancelled());
|
|
|
}
|
|
|
|
|
|
- public void testRunnableDoesNotRunAfterCancellation() throws Exception {
|
|
|
- int iterations = scaledRandomIntBetween(2, 12);
|
|
|
-
|
|
|
- // we don't have the cancellable until we schedule the task, which needs the barrier object to reference in the closure
|
|
|
- // so break the circular dependency here
|
|
|
- AtomicReference<Runnable> checkCancel = new AtomicReference<>();
|
|
|
-
|
|
|
- AtomicInteger counter = new AtomicInteger();
|
|
|
- CyclicBarrier barrier = new CyclicBarrier(2, () -> checkCancel.get().run());
|
|
|
- Runnable countingRunnable = () -> {
|
|
|
+ public void testRunnableRunsAtMostOnceAfterCancellation() throws Exception {
|
|
|
+ final var intervalMillis = randomLongBetween(1, 50);
|
|
|
+ final AtomicInteger counter = new AtomicInteger();
|
|
|
+ final CountDownLatch doneLatch = new CountDownLatch(scaledRandomIntBetween(1, 12));
|
|
|
+ final Runnable countingRunnable = () -> {
|
|
|
counter.incrementAndGet();
|
|
|
- try {
|
|
|
- barrier.await();
|
|
|
- } catch (Exception e) {
|
|
|
- throw new AssertionError(e);
|
|
|
- }
|
|
|
+ doneLatch.countDown();
|
|
|
};
|
|
|
|
|
|
- TimeValue interval = TimeValue.timeValueMillis(50L);
|
|
|
- Cancellable cancellable = threadPool.scheduleWithFixedDelay(countingRunnable, interval, threadPool.generic());
|
|
|
- checkCancel.set(new Runnable() {
|
|
|
- private int remaining = iterations;
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- if (--remaining == 0) {
|
|
|
- cancellable.cancel();
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- for (int i = 0; i < iterations; i++) {
|
|
|
- barrier.await();
|
|
|
- }
|
|
|
- expectThrows(TimeoutException.class, () -> barrier.await(2 * interval.millis(), TimeUnit.MILLISECONDS));
|
|
|
-
|
|
|
- assertThat(counter.get(), equalTo(iterations));
|
|
|
-
|
|
|
+ final Cancellable cancellable = threadPool.scheduleWithFixedDelay(
|
|
|
+ countingRunnable,
|
|
|
+ TimeValue.timeValueMillis(intervalMillis),
|
|
|
+ threadPool.generic()
|
|
|
+ );
|
|
|
+ safeAwait(doneLatch);
|
|
|
+ assertTrue(cancellable.cancel());
|
|
|
+ final var iterations = counter.get();
|
|
|
if (rarely()) {
|
|
|
- assertBusy(() -> {
|
|
|
- expectThrows(TimeoutException.class, () -> barrier.await(interval.millis(), TimeUnit.MILLISECONDS));
|
|
|
- assertThat(counter.get(), equalTo(iterations));
|
|
|
- }, 5 * interval.millis(), TimeUnit.MILLISECONDS);
|
|
|
+ Thread.sleep(randomLongBetween(0, intervalMillis * 5));
|
|
|
+ } else if (randomBoolean()) {
|
|
|
+ Thread.yield();
|
|
|
}
|
|
|
+ assertThat(counter.get(), oneOf(iterations, iterations + 1));
|
|
|
}
|
|
|
}
|