|
|
@@ -36,6 +36,7 @@ import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
@@ -488,6 +489,9 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
|
|
|
|
|
final AtomicBoolean running = new AtomicBoolean(true);
|
|
|
final AtomicInteger leaderIndices = new AtomicInteger(0);
|
|
|
+ final CountDownLatch latchThree = new CountDownLatch(3);
|
|
|
+ final CountDownLatch latchSix = new CountDownLatch(6);
|
|
|
+ final CountDownLatch latchNine = new CountDownLatch(9);
|
|
|
|
|
|
// start creating new indices on the remote cluster
|
|
|
final Thread createNewLeaderIndicesThread = new Thread(() -> {
|
|
|
@@ -502,6 +506,9 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
|
|
} else {
|
|
|
Thread.sleep(200L);
|
|
|
}
|
|
|
+ latchThree.countDown();
|
|
|
+ latchSix.countDown();
|
|
|
+ latchNine.countDown();
|
|
|
} catch (Exception e) {
|
|
|
throw new AssertionError(e);
|
|
|
}
|
|
|
@@ -510,23 +517,29 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
|
|
createNewLeaderIndicesThread.start();
|
|
|
|
|
|
// wait for 3 leader indices to be created on the remote cluster
|
|
|
- assertBusy(() -> assertThat(leaderIndices.get(), greaterThanOrEqualTo(3)));
|
|
|
- assertBusy(() -> assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo(3L)));
|
|
|
+ latchThree.await(30L, TimeUnit.SECONDS);
|
|
|
+ assertThat(leaderIndices.get(), greaterThanOrEqualTo(3));
|
|
|
+ assertBusy(() -> assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo(3L)),
|
|
|
+ 30L, TimeUnit.SECONDS);
|
|
|
|
|
|
// now pause some random patterns
|
|
|
pausedAutoFollowerPatterns.forEach(this::pauseAutoFollowPattern);
|
|
|
assertBusy(() -> autoFollowPatterns.forEach(pattern ->
|
|
|
- assertThat(getAutoFollowPattern(pattern).isActive(), equalTo(pausedAutoFollowerPatterns.contains(pattern) == false))));
|
|
|
+ assertThat(getAutoFollowPattern(pattern).isActive(), equalTo(pausedAutoFollowerPatterns.contains(pattern) == false))),
|
|
|
+ 30L, TimeUnit.SECONDS);
|
|
|
|
|
|
// wait for more leader indices to be created on the remote cluster
|
|
|
- assertBusy(() -> assertThat(leaderIndices.get(), greaterThanOrEqualTo(6)));
|
|
|
+ latchSix.await(30L, TimeUnit.SECONDS);
|
|
|
+ assertThat(leaderIndices.get(), greaterThanOrEqualTo(6));
|
|
|
|
|
|
// resume auto follow patterns
|
|
|
pausedAutoFollowerPatterns.forEach(this::resumeAutoFollowPattern);
|
|
|
- assertBusy(() -> autoFollowPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive())));
|
|
|
+ assertBusy(() -> autoFollowPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive())),
|
|
|
+ 30L, TimeUnit.SECONDS);
|
|
|
|
|
|
// wait for more leader indices to be created on the remote cluster
|
|
|
- assertBusy(() -> assertThat(leaderIndices.get(), greaterThanOrEqualTo(9)));
|
|
|
+ latchNine.await(30L, TimeUnit.SECONDS);
|
|
|
+ assertThat(leaderIndices.get(), greaterThanOrEqualTo(9));
|
|
|
assertBusy(() -> assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo(9L)),
|
|
|
30L, TimeUnit.SECONDS);
|
|
|
|