|
@@ -567,7 +567,8 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
|
|
|
|
|
protected void stopWatcher() throws Exception {
|
|
protected void stopWatcher() throws Exception {
|
|
assertBusy(() -> {
|
|
assertBusy(() -> {
|
|
- WatcherStatsResponse watcherStatsResponse = new WatcherStatsRequestBuilder(client()).get();
|
|
|
|
|
|
+
|
|
|
|
+ WatcherStatsResponse watcherStatsResponse = new WatcherStatsRequestBuilder(client()).setIncludeCurrentWatches(true).get();
|
|
assertThat(watcherStatsResponse.hasFailures(), is(false));
|
|
assertThat(watcherStatsResponse.hasFailures(), is(false));
|
|
List<Tuple<String, WatcherState>> currentStatesFromStatsRequest = watcherStatsResponse.getNodes()
|
|
List<Tuple<String, WatcherState>> currentStatesFromStatsRequest = watcherStatsResponse.getNodes()
|
|
.stream()
|
|
.stream()
|
|
@@ -580,7 +581,8 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
|
.collect(Collectors.toList());
|
|
.collect(Collectors.toList());
|
|
List<WatcherState> states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList());
|
|
List<WatcherState> states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList());
|
|
|
|
|
|
- logger.info("waiting to stop watcher, current states {}", currentStatesFromStatsRequest);
|
|
|
|
|
|
+ long currentWatches = watcherStatsResponse.getNodes().stream().mapToLong(n -> n.getSnapshots().size()).sum();
|
|
|
|
+ logger.info("waiting to stop watcher, current states {}, current watches [{}]", currentStatesFromStatsRequest, currentWatches);
|
|
|
|
|
|
boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED);
|
|
boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED);
|
|
if (isAllStateStarted) {
|
|
if (isAllStateStarted) {
|
|
@@ -594,7 +596,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
|
}
|
|
}
|
|
|
|
|
|
boolean isAllStateStopped = states.stream().allMatch(w -> w == WatcherState.STOPPED);
|
|
boolean isAllStateStopped = states.stream().allMatch(w -> w == WatcherState.STOPPED);
|
|
- if (isAllStateStopped) {
|
|
|
|
|
|
+ if (isAllStateStopped && currentWatches == 0) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|