Bläddra i källkod

Submit batches of joins as single tasks (#83803)

Today the `MasterService` permits clients to submit a batch of tasks
which it guarantees to execute together, but the only place that this
functionality is used in production code is for completing an election.
It was done this way so that each join could succeed or fail
independently, but since #83562 we can track the status of joins through
to completion without needing them all to be separate tasks.

This commit introduces a `JoinTask` which represents the whole batch of
joins as a single task. It also gives us a place to hang the strange
`_FINISH_ELECTION_` task that was used to flag whether a batch was an
election-completing batch or not.

Relates #83784
David Turner 3 år sedan
förälder
incheckning
586378b97d

+ 17 - 49
server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

@@ -15,7 +15,6 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ChannelActionListener;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskConfig;
-import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.NotMasterException;
 import org.elasticsearch.cluster.coordination.Coordinator.Mode;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -104,8 +103,7 @@ public class JoinHelper {
             private final long term = currentTermSupplier.getAsLong();
 
             @Override
-            public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)
-                throws Exception {
+            public ClusterTasksResult<JoinTask> execute(ClusterState currentState, List<JoinTask> joinTasks) {
                 // The current state that MasterService uses might have been updated by a (different) master in a higher term already
                 // Stop processing the current cluster state update, as there's no point in continuing to compute it as
                 // it will later be rejected by Coordinator.publish(...) anyhow
@@ -114,7 +112,7 @@ public class JoinHelper {
                     throw new NotMasterException(
                         "Higher term encountered (current: " + currentState.term() + " > used: " + term + "), there is a newer master"
                     );
-                } else if (currentState.nodes().getMasterNodeId() == null && joiningTasks.stream().anyMatch(Task::isBecomeMasterTask)) {
+                } else if (currentState.nodes().getMasterNodeId() == null && joinTasks.stream().anyMatch(JoinTask::isBecomingMaster)) {
                     assert currentState.term() < term : "there should be at most one become master task per election (= by term)";
                     final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata())
                         .term(term)
@@ -124,7 +122,7 @@ public class JoinHelper {
                 } else if (currentState.nodes().isLocalNodeElectedMaster()) {
                     assert currentState.term() == term : "term should be stable for the same master";
                 }
-                return super.execute(currentState, joiningTasks);
+                return super.execute(currentState, joinTasks);
             }
 
         };
@@ -293,7 +291,7 @@ public class JoinHelper {
 
             // Typically we're already connected to the destination at this point, the PeerFinder holds a reference to this connection to
             // keep it open, but we need to acquire our own reference to keep the connection alive through the joining process.
-            transportService.connectToNode(destination, new ActionListener<Releasable>() {
+            transportService.connectToNode(destination, new ActionListener<>() {
                 @Override
                 public void onResponse(Releasable connectionReference) {
                     logger.trace("acquired connection for joining join {} with {}", destination, joinRequest);
@@ -361,31 +359,6 @@ public class JoinHelper {
         });
     }
 
-    static class JoinTaskListener implements ClusterStateTaskListener {
-        private final JoinTaskExecutor.Task task;
-        private final ActionListener<Void> joinListener;
-
-        JoinTaskListener(JoinTaskExecutor.Task task, ActionListener<Void> joinListener) {
-            this.task = task;
-            this.joinListener = joinListener;
-        }
-
-        @Override
-        public void onFailure(Exception e) {
-            joinListener.onFailure(e);
-        }
-
-        @Override
-        public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-            joinListener.onResponse(null);
-        }
-
-        @Override
-        public String toString() {
-            return "JoinTaskListener{task=" + task + "}";
-        }
-    }
-
     interface JoinAccumulator {
         void handleJoinRequest(DiscoveryNode sender, ActionListener<Void> joinListener);
 
@@ -395,11 +368,7 @@ public class JoinHelper {
     class LeaderJoinAccumulator implements JoinAccumulator {
         @Override
         public void handleJoinRequest(DiscoveryNode sender, ActionListener<Void> joinListener) {
-            final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(
-                sender,
-                joinReasonService.getJoinReason(sender, Mode.LEADER),
-                joinListener
-            );
+            final JoinTask task = JoinTask.singleNode(sender, joinReasonService.getJoinReason(sender, Mode.LEADER), joinListener);
             assert joinTaskExecutor != null;
             masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
         }
@@ -454,21 +423,20 @@ public class JoinHelper {
             assert closed == false : "CandidateJoinAccumulator closed";
             closed = true;
             if (newMode == Mode.LEADER) {
-                final List<JoinTaskExecutor.Task> pendingAsTasks = new ArrayList<>();
-                joinRequestAccumulator.forEach(
-                    (node, listener) -> pendingAsTasks.add(
-                        new JoinTaskExecutor.Task(node, joinReasonService.getJoinReason(node, Mode.CANDIDATE), listener)
-                    )
-                );
-
-                final String stateUpdateSource = "elected-as-master ([" + pendingAsTasks.size() + "] nodes joined)";
+                final JoinTask joinTask = JoinTask.completingElection(joinRequestAccumulator.entrySet().stream().map(entry -> {
+                    final DiscoveryNode discoveryNode = entry.getKey();
+                    final ActionListener<Void> listener = entry.getValue();
+                    return new JoinTask.NodeJoinTask(
+                        discoveryNode,
+                        joinReasonService.getJoinReason(discoveryNode, Mode.CANDIDATE),
+                        listener
+                    );
+                }));
 
-                pendingAsTasks.add(JoinTaskExecutor.newBecomeMasterTask());
-                pendingAsTasks.add(JoinTaskExecutor.newFinishElectionTask());
                 joinTaskExecutor = joinTaskExecutorGenerator.get();
-                masterService.submitStateUpdateTasks(
-                    stateUpdateSource,
-                    pendingAsTasks,
+                masterService.submitStateUpdateTask(
+                    "elected-as-master ([" + joinTask.nodeCount() + "] nodes joined)",
+                    joinTask,
                     ClusterStateTaskConfig.build(Priority.URGENT),
                     joinTaskExecutor
                 );

+ 94 - 0
server/src/main/java/org/elasticsearch/cluster/coordination/JoinTask.java

@@ -0,0 +1,94 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.coordination;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+public record JoinTask(List<NodeJoinTask> nodeJoinTasks, boolean isBecomingMaster) implements ClusterStateTaskListener {
+
+    public static JoinTask singleNode(DiscoveryNode node, String reason, ActionListener<Void> listener) {
+        return new JoinTask(List.of(new NodeJoinTask(node, reason, listener)), false);
+    }
+
+    public static JoinTask completingElection(Stream<NodeJoinTask> nodeJoinTaskStream) {
+        return new JoinTask(nodeJoinTaskStream.toList(), true);
+    }
+
+    public JoinTask(List<NodeJoinTask> nodeJoinTasks, boolean isBecomingMaster) {
+        this.nodeJoinTasks = Collections.unmodifiableList(nodeJoinTasks);
+        this.isBecomingMaster = isBecomingMaster;
+    }
+
+    public int nodeCount() {
+        return nodeJoinTasks.size();
+    }
+
+    @Override
+    public void onFailure(Exception e) {
+        for (NodeJoinTask nodeJoinTask : nodeJoinTasks) {
+            nodeJoinTask.listener.onFailure(e);
+        }
+    }
+
+    @Override
+    public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
+        assert false : "not called";
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder stringBuilder = new StringBuilder();
+
+        if (isBecomingMaster) {
+            stringBuilder.append("_FINISH_ELECTION_");
+        }
+
+        for (NodeJoinTask nodeJoinTask : nodeJoinTasks) {
+            if (stringBuilder.isEmpty() == false) {
+                stringBuilder.append(", ");
+            }
+            nodeJoinTask.appendDescription(stringBuilder);
+        }
+
+        return stringBuilder.toString();
+    }
+
+    public Iterable<DiscoveryNode> nodes() {
+        return () -> nodeJoinTasks.stream().map(j -> j.node).iterator();
+    }
+
+    public record NodeJoinTask(DiscoveryNode node, String reason, ActionListener<Void> listener) {
+
+        public NodeJoinTask(DiscoveryNode node, String reason, ActionListener<Void> listener) {
+            this.node = Objects.requireNonNull(node);
+            this.reason = reason;
+            this.listener = listener;
+        }
+
+        @Override
+        public String toString() {
+            final StringBuilder stringBuilder = new StringBuilder();
+            appendDescription(stringBuilder);
+            return stringBuilder.toString();
+        }
+
+        public void appendDescription(StringBuilder stringBuilder) {
+            node.appendDescriptionWithoutAttributes(stringBuilder);
+            stringBuilder.append(' ').append(reason);
+        }
+    }
+}

+ 49 - 91
server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java

@@ -13,7 +13,6 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
-import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.NotMasterException;
 import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -37,73 +36,33 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
 
-public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecutor.Task> {
+public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTask> {
 
     private static final Logger logger = LogManager.getLogger(JoinTaskExecutor.class);
 
     private final AllocationService allocationService;
     private final RerouteService rerouteService;
 
-    public record Task(DiscoveryNode node, String reason, ActionListener<Void> listener) implements ClusterStateTaskListener {
-
-        @Override
-        public String toString() {
-            if (node == null) {
-                return reason;
-            }
-
-            final StringBuilder stringBuilder = new StringBuilder();
-            node.appendDescriptionWithoutAttributes(stringBuilder);
-            stringBuilder.append(' ').append(reason);
-            return stringBuilder.toString();
-        }
-
-        public boolean isBecomeMasterTask() {
-            return reason.equals(BECOME_MASTER_TASK_REASON);
-        }
-
-        public boolean isFinishElectionTask() {
-            return reason.equals(FINISH_ELECTION_TASK_REASON);
-        }
-
-        private static final String BECOME_MASTER_TASK_REASON = "_BECOME_MASTER_TASK_";
-        private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
-
-        @Override
-        public void onFailure(Exception e) {
-            listener.onFailure(e);
-        }
-
-        @Override
-        public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-            listener.onResponse(null);
-        }
-
-    }
-
     public JoinTaskExecutor(AllocationService allocationService, RerouteService rerouteService) {
         this.allocationService = allocationService;
         this.rerouteService = rerouteService;
     }
 
     @Override
-    public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> joiningNodes) throws Exception {
-        final ClusterTasksResult.Builder<Task> results = ClusterTasksResult.builder();
+    public ClusterTasksResult<JoinTask> execute(ClusterState currentState, List<JoinTask> joinTasks) {
+        final ClusterTasksResult.Builder<JoinTask> results = ClusterTasksResult.builder();
+
+        final boolean isBecomingMaster = joinTasks.stream().anyMatch(JoinTask::isBecomingMaster);
 
         final DiscoveryNodes currentNodes = currentState.nodes();
         boolean nodesChanged = false;
         ClusterState.Builder newState;
 
-        if (joiningNodes.size() == 1 && joiningNodes.get(0).isFinishElectionTask()) {
-            final Task task = joiningNodes.get(0);
-            return results.success(task, new LegacyClusterTaskResultActionListener(task, currentState)).build(currentState);
-        } else if (currentNodes.getMasterNode() == null && joiningNodes.stream().anyMatch(Task::isBecomeMasterTask)) {
-            assert joiningNodes.stream().anyMatch(Task::isFinishElectionTask)
-                : "becoming a master but election is not finished " + joiningNodes;
+        if (currentNodes.getMasterNode() == null && isBecomingMaster) {
             // use these joins to try and become the master.
             // Note that we don't have to do any validation of the amount of joining nodes - the commit
             // during the cluster state publishing guarantees that we have enough
-            newState = becomeMasterAndTrimConflictingNodes(currentState, joiningNodes);
+            newState = becomeMasterAndTrimConflictingNodes(currentState, joinTasks);
             nodesChanged = true;
         } else if (currentNodes.isLocalNodeElectedMaster() == false) {
             logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode());
@@ -122,34 +81,48 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
         final boolean enforceVersionBarrier = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false;
         // processing any joins
         Map<String, String> joiniedNodeNameIds = new HashMap<>();
-        for (final Task joinTask : joiningNodes) {
-            if (joinTask.isBecomeMasterTask() || joinTask.isFinishElectionTask()) {
-                // noop
-            } else if (currentNodes.nodeExistsWithSameRoles(joinTask.node())) {
-                logger.debug("received a join request for an existing node [{}]", joinTask.node());
-            } else {
-                final DiscoveryNode node = joinTask.node();
-                try {
-                    if (enforceVersionBarrier) {
-                        ensureVersionBarrier(node.getVersion(), minClusterNodeVersion);
-                    }
-                    ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
-                    // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
-                    // we have to reject nodes that don't support all indices we have in this cluster
-                    ensureIndexCompatibility(node.getVersion(), currentState.getMetadata());
-                    nodesBuilder.add(node);
-                    nodesChanged = true;
-                    minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
-                    maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
-                    if (node.isMasterNode()) {
-                        joiniedNodeNameIds.put(node.getName(), node.getId());
+        for (final JoinTask joinTask : joinTasks) {
+            final List<Runnable> onTaskSuccess = new ArrayList<>(joinTask.nodeCount());
+            for (final JoinTask.NodeJoinTask nodeJoinTask : joinTask.nodeJoinTasks()) {
+                final DiscoveryNode node = nodeJoinTask.node();
+                if (currentNodes.nodeExistsWithSameRoles(node)) {
+                    logger.debug("received a join request for an existing node [{}]", node);
+                } else {
+                    try {
+                        if (enforceVersionBarrier) {
+                            ensureVersionBarrier(node.getVersion(), minClusterNodeVersion);
+                        }
+                        ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
+                        // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
+                        // we have to reject nodes that don't support all indices we have in this cluster
+                        ensureIndexCompatibility(node.getVersion(), currentState.getMetadata());
+                        nodesBuilder.add(node);
+                        nodesChanged = true;
+                        minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
+                        maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
+                        if (node.isMasterNode()) {
+                            joiniedNodeNameIds.put(node.getName(), node.getId());
+                        }
+                    } catch (IllegalArgumentException | IllegalStateException e) {
+                        onTaskSuccess.add(() -> nodeJoinTask.listener().onFailure(e));
+                        continue;
                     }
-                } catch (IllegalArgumentException | IllegalStateException e) {
-                    results.failure(joinTask, e);
-                    continue;
                 }
+                onTaskSuccess.add(() -> nodeJoinTask.listener().onResponse(null));
             }
-            results.success(joinTask, new LegacyClusterTaskResultActionListener(joinTask, currentState));
+            results.success(joinTask, new ActionListener<>() {
+                @Override
+                public void onResponse(ClusterState clusterState) {
+                    for (Runnable joinCompleter : onTaskSuccess) {
+                        joinCompleter.run();
+                    }
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    joinTask.onFailure(e);
+                }
+            });
         }
 
         if (nodesChanged) {
@@ -203,17 +176,14 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
         }
     }
 
-    protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List<Task> joiningNodes) {
+    protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List<JoinTask> joinTasks) {
         assert currentState.nodes().getMasterNodeId() == null : currentState;
         DiscoveryNodes currentNodes = currentState.nodes();
         DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes);
         nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId());
 
-        for (final Task joinTask : joiningNodes) {
-            if (joinTask.isBecomeMasterTask() || joinTask.isFinishElectionTask()) {
-                // noop
-            } else {
-                final DiscoveryNode joiningNode = joinTask.node();
+        for (final JoinTask joinTask : joinTasks) {
+            for (final DiscoveryNode joiningNode : joinTask.nodes()) {
                 final DiscoveryNode nodeWithSameId = nodesBuilder.get(joiningNode.getId());
                 if (nodeWithSameId != null && nodeWithSameId.equals(joiningNode) == false) {
                     logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameId, joiningNode);
@@ -249,18 +219,6 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
         return false;
     }
 
-    public static Task newBecomeMasterTask() {
-        return new Task(null, Task.BECOME_MASTER_TASK_REASON, ActionListener.wrap(() -> {}));
-    }
-
-    /**
-     * a task that is used to signal the election is stopped and we should process pending joins.
-     * it may be used in combination with {@link JoinTaskExecutor#newBecomeMasterTask()}
-     */
-    public static Task newFinishElectionTask() {
-        return new Task(null, Task.FINISH_ELECTION_TASK_REASON, ActionListener.wrap(() -> {}));
-    }
-
     /**
      * Ensures that all indices are compatible with the given node version. This will ensure that all indices in the given metadata
      * will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index

+ 2 - 2
server/src/test/java/org/elasticsearch/cluster/coordination/JoinTaskExecutorTests.java

@@ -172,10 +172,10 @@ public class JoinTaskExecutorTests extends ESTestCase {
             .nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId()).masterNodeId(masterNode.getId()).add(bwcNode))
             .build();
 
-        final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result = joinTaskExecutor.execute(
+        final ClusterStateTaskExecutor.ClusterTasksResult<JoinTask> result = joinTaskExecutor.execute(
             clusterState,
             List.of(
-                new JoinTaskExecutor.Task(
+                JoinTask.singleNode(
                     actualNode,
                     "test",
                     ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); })

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java

@@ -248,7 +248,7 @@ public class AutoExpandReplicasTests extends ESTestCase {
                                                                                                                              // is the
                                                                                                                              // master
 
-            state = cluster.addNodes(state, Collections.singletonList(newNode));
+            state = cluster.addNode(state, newNode);
 
             // use allocation filtering
             state = cluster.updateSettings(

+ 22 - 24
server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java

@@ -48,6 +48,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardUpdate
 import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
 import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardUpdateTask;
 import org.elasticsearch.cluster.block.ClusterBlock;
+import org.elasticsearch.cluster.coordination.JoinTask;
 import org.elasticsearch.cluster.coordination.JoinTaskExecutor;
 import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -94,7 +95,6 @@ import org.elasticsearch.transport.TransportResponse;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -349,39 +349,37 @@ public class ClusterStateChanges {
         return execute(transportClusterRerouteAction, request, state);
     }
 
-    public ClusterState addNodes(ClusterState clusterState, List<DiscoveryNode> nodes) {
+    public ClusterState addNode(ClusterState clusterState, DiscoveryNode discoveryNode) {
         return runTasks(
             joinTaskExecutor,
             clusterState,
-            nodes.stream()
-                .map(
-                    node -> new JoinTaskExecutor.Task(
-                        node,
-                        "dummy reason",
-                        ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); })
-                    )
+            List.of(
+                JoinTask.singleNode(
+                    discoveryNode,
+                    "dummy reason",
+                    ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); })
                 )
-                .toList()
+            )
         );
     }
 
     public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List<DiscoveryNode> nodes) {
-        List<JoinTaskExecutor.Task> joinNodes = new ArrayList<>();
-        joinNodes.add(JoinTaskExecutor.newBecomeMasterTask());
-        joinNodes.add(JoinTaskExecutor.newFinishElectionTask());
-        joinNodes.addAll(
-            nodes.stream()
-                .map(
-                    node -> new JoinTaskExecutor.Task(
-                        node,
-                        "dummy reason",
-                        ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); })
-                    )
+        return runTasks(
+            joinTaskExecutor,
+            clusterState,
+            List.of(
+                JoinTask.completingElection(
+                    nodes.stream()
+                        .map(
+                            node -> new JoinTask.NodeJoinTask(
+                                node,
+                                "dummy reason",
+                                ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); })
+                            )
+                        )
                 )
-                .toList()
+            )
         );
-
-        return runTasks(joinTaskExecutor, clusterState, joinNodes);
     }
 
     public ClusterState removeNodes(ClusterState clusterState, List<DiscoveryNode> nodes) {

+ 2 - 2
server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java

@@ -463,7 +463,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
             if (randomBoolean()) {
                 // add node
                 if (state.nodes().getSize() < 10) {
-                    state = cluster.addNodes(state, Collections.singletonList(createNode()));
+                    state = cluster.addNode(state, createNode());
                     updateNodes(state, clusterStateServiceMap, indicesServiceSupplier);
                 }
             } else {
@@ -476,7 +476,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
                     }
                     if (randomBoolean()) {
                         // and add it back
-                        state = cluster.addNodes(state, Collections.singletonList(discoveryNode));
+                        state = cluster.addNode(state, discoveryNode);
                         updateNodes(state, clusterStateServiceMap, indicesServiceSupplier);
                     }
                 }