|
@@ -273,6 +273,23 @@ public class ClusterServiceUtils {
|
|
|
* completed exceptionally on the scheduler thread that belongs to {@code clusterService}.
|
|
|
*/
|
|
|
public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) {
|
|
|
+ return addTemporaryStateListener(clusterService, predicate, ESTestCase.SAFE_AWAIT_TIMEOUT);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state
|
|
|
+ * that satisfies {@code predicate}, at which point it unsubscribes itself.
|
|
|
+ *
|
|
|
+ * @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
|
|
|
+ * given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is
|
|
|
+ * already complete. If no matching cluster state is seen within the provided {@code timeout} then the listener is
|
|
|
+ * completed exceptionally on the scheduler thread that belongs to {@code clusterService}.
|
|
|
+ */
|
|
|
+ public static SubscribableListener<Void> addTemporaryStateListener(
|
|
|
+ ClusterService clusterService,
|
|
|
+ Predicate<ClusterState> predicate,
|
|
|
+ TimeValue timeout
|
|
|
+ ) {
|
|
|
final var listener = new SubscribableListener<Void>();
|
|
|
final ClusterStateListener clusterStateListener = new ClusterStateListener() {
|
|
|
@Override
|
|
@@ -296,7 +313,7 @@ public class ClusterServiceUtils {
|
|
|
if (predicate.test(clusterService.state())) {
|
|
|
listener.onResponse(null);
|
|
|
} else {
|
|
|
- listener.addTimeout(ESTestCase.SAFE_AWAIT_TIMEOUT, clusterService.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
|
|
|
+ listener.addTimeout(timeout, clusterService.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
|
|
|
}
|
|
|
return listener;
|
|
|
}
|