فهرست منبع

Make TransportAddVotingConfigExclusionsAction retryable (#98386)

The docs for this API say the following:

> If the API fails, you can safely retry it. Only a successful response
> guarantees that the node has been removed from the voting
> configuration and will not be reinstated.

Unfortunately this isn't true today: if the request adds no exclusions
then we do not wait before responding. This commit makes the API wait
until all exclusions are really applied.
David Turner 2 سال پیش
والد
کامیت
1fef466f50

+ 5 - 0
docs/changelog/98386.yaml

@@ -0,0 +1,5 @@
+pr: 98386
+summary: Make `TransportAddVotingConfigExclusionsAction` retryable
+area: Cluster Coordination
+type: bug
+issues: []

+ 14 - 19
server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java

@@ -39,9 +39,9 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
+import java.util.HashSet;
 import java.util.Set;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 public class TransportAddVotingConfigExclusionsAction extends TransportMasterNodeAction<
     AddVotingConfigExclusionsRequest,
@@ -107,13 +107,14 @@ public class TransportAddVotingConfigExclusionsAction extends TransportMasterNod
 
         submitUnbatchedTask("add-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT) {
 
-            private Set<VotingConfigExclusion> resolvedExclusions;
-
             @Override
             public ClusterState execute(ClusterState currentState) {
-                assert resolvedExclusions == null : resolvedExclusions;
                 final int finalMaxVotingConfigExclusions = TransportAddVotingConfigExclusionsAction.this.maxVotingConfigExclusions;
-                resolvedExclusions = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, finalMaxVotingConfigExclusions);
+                final var resolvedExclusions = resolveVotingConfigExclusionsAndCheckMaximum(
+                    request,
+                    currentState,
+                    finalMaxVotingConfigExclusions
+                );
 
                 final CoordinationMetadata.Builder builder = CoordinationMetadata.builder(currentState.coordinationMetadata());
                 resolvedExclusions.forEach(builder::addVotingConfigExclusion);
@@ -138,13 +139,13 @@ public class TransportAddVotingConfigExclusionsAction extends TransportMasterNod
                     threadPool.getThreadContext()
                 );
 
-                final Set<String> excludedNodeIds = resolvedExclusions.stream()
-                    .map(VotingConfigExclusion::getNodeId)
-                    .collect(Collectors.toSet());
-
                 final Predicate<ClusterState> allNodesRemoved = clusterState -> {
-                    final Set<String> votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds();
-                    return excludedNodeIds.stream().noneMatch(votingConfigNodeIds::contains);
+                    final Set<String> votingConfigNodeIds = new HashSet<>();
+                    votingConfigNodeIds.addAll(clusterState.getLastCommittedConfiguration().getNodeIds());
+                    votingConfigNodeIds.addAll(clusterState.getLastAcceptedConfiguration().getNodeIds());
+                    return clusterState.getVotingConfigExclusions()
+                        .stream()
+                        .noneMatch(votingConfigExclusion -> votingConfigNodeIds.contains(votingConfigExclusion.getNodeId()));
                 };
 
                 final Listener clusterStateListener = new Listener() {
@@ -156,20 +157,14 @@ public class TransportAddVotingConfigExclusionsAction extends TransportMasterNod
                     @Override
                     public void onClusterServiceClose() {
                         listener.onFailure(
-                            new ElasticsearchException(
-                                "cluster service closed while waiting for voting config exclusions "
-                                    + resolvedExclusions
-                                    + " to take effect"
-                            )
+                            new ElasticsearchException("cluster service closed while waiting for voting config exclusions to take effect")
                         );
                     }
 
                     @Override
                     public void onTimeout(TimeValue timeout) {
                         listener.onFailure(
-                            new ElasticsearchTimeoutException(
-                                "timed out waiting for voting config exclusions " + resolvedExclusions + " to take effect"
-                            )
+                            new ElasticsearchTimeoutException("timed out waiting for voting config exclusions to take effect")
                         );
                     }
                 };

+ 36 - 16
server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
 import org.elasticsearch.cluster.node.DiscoveryNodes.Builder;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.ClusterSettings;
@@ -245,7 +246,7 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
             localNode,
             AddVotingConfigExclusionsAction.NAME,
             new AddVotingConfigExclusionsRequest(new String[] { "absent_id" }, Strings.EMPTY_ARRAY, TimeValue.timeValueSeconds(30)),
-            expectSuccess(e -> {
+            expectSuccess(r -> {
                 final var state = clusterService.getClusterApplierService().state();
                 assertEquals(
                     Set.of(new VotingConfigExclusion("absent_id", VotingConfigExclusion.MISSING_VALUE_MARKER)),
@@ -282,7 +283,7 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
             localNode,
             AddVotingConfigExclusionsAction.NAME,
             new AddVotingConfigExclusionsRequest("absent_node"),
-            expectSuccess(e -> {
+            expectSuccess(r -> {
                 final var state = clusterService.getClusterApplierService().state();
                 assertEquals(
                     Set.of(new VotingConfigExclusion(VotingConfigExclusion.MISSING_VALUE_MARKER, "absent_node")),
@@ -297,8 +298,7 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
 
     public void testExcludeExistingNodesByNodeNames() {
         final var countDownLatch = new CountDownLatch(1);
-        final var configurationAdjuster = new AdjustConfigurationForExclusions();
-        clusterStateObserver.waitForNextChange(configurationAdjuster);
+        clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
         transportService.sendRequest(
             localNode,
             AddVotingConfigExclusionsAction.NAME,
@@ -314,7 +314,7 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
         safeAwait(countDownLatch);
     }
 
-    public void testSucceedsEvenIfAllExclusionsAlreadyAdded() {
+    public void testTriggersReconfigurationEvenIfAllExclusionsAlreadyAddedButStillInConfiguration() {
         final ClusterState state = clusterService.state();
         final ClusterState.Builder builder = builder(state);
         builder.metadata(
@@ -326,14 +326,19 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
         setState(clusterService, builder);
 
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-
+        clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
         transportService.sendRequest(
             localNode,
             AddVotingConfigExclusionsAction.NAME,
-            new AddVotingConfigExclusionsRequest("other1"),
+            randomFrom(
+                new AddVotingConfigExclusionsRequest("other1"),
+                new AddVotingConfigExclusionsRequest(new String[] { "other1" }, Strings.EMPTY_ARRAY, TimeValue.timeValueSeconds(30))
+            ),
             expectSuccess(r -> {
                 assertNotNull(r);
-                assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), contains(otherNode1Exclusion));
+                final var finalState = clusterService.getClusterApplierService().state();
+                assertThat(finalState.getVotingConfigExclusions(), contains(otherNode1Exclusion));
+                assertAllExclusionsApplied(finalState);
                 countDownLatch.countDown();
             })
         );
@@ -346,20 +351,26 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
         builder.metadata(
             Metadata.builder(state.metadata())
                 .coordinationMetadata(
-                    CoordinationMetadata.builder(state.coordinationMetadata()).addVotingConfigExclusion(otherNode1Exclusion).build()
+                    CoordinationMetadata.builder(state.coordinationMetadata())
+                        .lastCommittedConfiguration(VotingConfiguration.of(localNode, otherNode2))
+                        .lastAcceptedConfiguration(VotingConfiguration.of(localNode, otherNode2))
+                        .addVotingConfigExclusion(otherNode1Exclusion)
+                        .build()
                 )
         );
         setState(clusterService, builder);
 
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-
+        clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
         transportService.sendRequest(
             localNode,
             AddVotingConfigExclusionsAction.NAME,
             new AddVotingConfigExclusionsRequest(new String[] { "other1" }, Strings.EMPTY_ARRAY, TimeValue.timeValueSeconds(30)),
             expectSuccess(r -> {
                 assertNotNull(r);
-                assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), contains(otherNode1Exclusion));
+                final var finalState = clusterService.getClusterApplierService().state();
+                assertThat(finalState.getVotingConfigExclusions(), contains(otherNode1Exclusion));
+                assertAllExclusionsApplied(finalState);
                 countDownLatch.countDown();
             })
         );
@@ -372,7 +383,11 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
         builder.metadata(
             Metadata.builder(state.metadata())
                 .coordinationMetadata(
-                    CoordinationMetadata.builder(state.coordinationMetadata()).addVotingConfigExclusion(otherNode1Exclusion).build()
+                    CoordinationMetadata.builder(state.coordinationMetadata())
+                        .lastCommittedConfiguration(VotingConfiguration.of(localNode, otherNode2))
+                        .lastAcceptedConfiguration(VotingConfiguration.of(localNode, otherNode2))
+                        .addVotingConfigExclusion(otherNode1Exclusion)
+                        .build()
                 )
         );
         setState(clusterService, builder);
@@ -385,7 +400,9 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
             new AddVotingConfigExclusionsRequest("other1"),
             expectSuccess(r -> {
                 assertNotNull(r);
-                assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), contains(otherNode1Exclusion));
+                final var finalState = clusterService.getClusterApplierService().state();
+                assertThat(finalState.getVotingConfigExclusions(), contains(otherNode1Exclusion));
+                assertAllExclusionsApplied(finalState);
                 countDownLatch.countDown();
             })
         );
@@ -469,7 +486,7 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
             expectError(e -> {
                 final Throwable rootCause = e.getRootCause();
                 assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class));
-                assertThat(rootCause.getMessage(), startsWith("timed out waiting for voting config exclusions [{other1}"));
+                assertThat(rootCause.getMessage(), equalTo("timed out waiting for voting config exclusions to take effect"));
                 countDownLatch.countDown();
             })
         );
@@ -533,10 +550,13 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
     private static class AdjustConfigurationForExclusions implements Listener {
         @Override
         public void onNewClusterState(ClusterState state) {
-            clusterService.getMasterService().submitUnbatchedStateUpdateTask("reconfiguration", new ClusterStateUpdateTask() {
+            final var prio = randomFrom(Priority.values());
+            clusterService.getMasterService().submitUnbatchedStateUpdateTask("reconfiguration", new ClusterStateUpdateTask(prio) {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
-                    assertThat(currentState, sameInstance(state));
+                    if (prio.compareTo(Priority.URGENT) <= 0) {
+                        assertThat(currentState, sameInstance(state));
+                    }
                     final Set<String> votingNodeIds = new HashSet<>();
                     currentState.nodes().forEach(n -> votingNodeIds.add(n.getId()));
                     currentState.getVotingConfigExclusions().forEach(t -> votingNodeIds.remove(t.getNodeId()));