|
@@ -12,7 +12,7 @@ import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.ActionResponse;
|
|
|
-import org.elasticsearch.action.support.GroupedActionListener;
|
|
|
+import org.elasticsearch.action.support.RefCountingListener;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
|
|
import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;
|
|
@@ -29,6 +29,7 @@ import org.elasticsearch.xcontent.XContentParser;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.LinkedHashSet;
|
|
|
import java.util.List;
|
|
@@ -345,32 +346,14 @@ public class ReservedClusterStateService {
|
|
|
List<Consumer<ActionListener<NonStateTransformResult>>> nonStateTransforms,
|
|
|
ActionListener<Collection<NonStateTransformResult>> listener
|
|
|
) {
|
|
|
- // Don't create grouped listener with 0 actions, just return
|
|
|
- if (nonStateTransforms.isEmpty()) {
|
|
|
- listener.onResponse(List.of());
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- GroupedActionListener<NonStateTransformResult> postTasksListener = new GroupedActionListener<>(
|
|
|
- nonStateTransforms.size(),
|
|
|
- new ActionListener<>() {
|
|
|
- @Override
|
|
|
- public void onResponse(Collection<NonStateTransformResult> updateKeyTaskResult) {
|
|
|
- listener.onResponse(updateKeyTaskResult);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- listener.onFailure(e);
|
|
|
- }
|
|
|
+ final List<NonStateTransformResult> result = Collections.synchronizedList(new ArrayList<>(nonStateTransforms.size()));
|
|
|
+ try (var listeners = new RefCountingListener(listener.map(ignored -> result))) {
|
|
|
+ for (var transform : nonStateTransforms) {
|
|
|
+ // non cluster state transforms don't modify the cluster state, they however are given a chance to return a more
|
|
|
+ // up-to-date version of the modified keys we should save in the reserved state. These calls are
|
|
|
+ // async and report back when they are done through the postTasksListener.
|
|
|
+ transform.accept(listeners.acquire(result::add));
|
|
|
}
|
|
|
- );
|
|
|
-
|
|
|
- for (var transform : nonStateTransforms) {
|
|
|
- // non cluster state transforms don't modify the cluster state, they however are given a chance to return a more
|
|
|
- // up-to-date version of the modified keys we should save in the reserved state. These calls are
|
|
|
- // async and report back when they are done through the postTasksListener.
|
|
|
- transform.accept(postTasksListener);
|
|
|
}
|
|
|
}
|
|
|
|