浏览代码

Fall back to join ping if not master (#97527)

The `JoinValidationService` doesn't need to send the cluster state if
we're not currently the master - instead it can just send a join ping.
This means it doesn't need to do any work adjusting the blocks.

Closes #97313
David Turner 2 年之前
父节点
当前提交
2f5d8da13d

+ 19 - 14
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -234,7 +234,8 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
         this.joinValidationService = new JoinValidationService(
             settings,
             transportService,
-            this::getStateForMasterService, // TODO see https://github.com/elastic/elasticsearch/issues/97313
+            this::getStateForJoinValidationService,
+            () -> getLastAcceptedState().metadata(),
             this.onJoinValidators
         );
         this.persistedStateSupplier = persistedStateSupplier;
@@ -679,20 +680,10 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
         // - if we're already master that it can make sense of the current cluster state.
         // - we have a healthy PING channel to the node
 
-        final ClusterState stateForJoinValidation;
-        synchronized (mutex) {
-            // similar to getStateForMasterService(), but don't rebuild the state if we're not the master since we don't use it in that case
-            final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
-            assert lastAcceptedState.nodes().getLocalNode() != null;
-            if (mode != Mode.LEADER || lastAcceptedState.term() != getCurrentTerm()) {
-                stateForJoinValidation = null;
-            } else {
-                stateForJoinValidation = lastAcceptedState;
-            }
-        }
-
+        final ClusterState stateForJoinValidation = getStateForJoinValidationService();
         final ListenableActionFuture<Empty> validateStateListener = new ListenableActionFuture<>();
-        if (stateForJoinValidation != null && stateForJoinValidation.nodes().isLocalNodeElectedMaster()) {
+        if (stateForJoinValidation != null) {
+            assert stateForJoinValidation.nodes().isLocalNodeElectedMaster();
             onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
             if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
                 // We do this in a couple of places including the cluster update thread. This one here is really just best effort to ensure
@@ -1478,6 +1469,20 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
         }
     }
 
+    private ClusterState getStateForJoinValidationService() {
+        synchronized (mutex) {
+            // similar to getStateForMasterService, but do not rebuild the state if not currently the master
+            final ClusterState clusterState = coordinationState.get().getLastAcceptedState();
+            assert clusterState.nodes().getLocalNode() != null;
+            if (mode != Mode.LEADER
+                || clusterState.term() != getCurrentTerm()
+                || clusterState.nodes().isLocalNodeElectedMaster() == false) {
+                return null;
+            }
+            return clusterState;
+        }
+    }
+
     /**
      * Add a no-master block and remove the master node ID from the given cluster state. Note that it's quite expensive to add blocks in a
      * large cluster state, so avoid using this where possible.

+ 97 - 41
server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java

@@ -17,6 +17,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.compress.CompressorFactory;
@@ -27,6 +28,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.AbstractRefCounted;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.RefCounted;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.env.Environment;
@@ -35,6 +37,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.BytesTransportRequest;
 import org.elasticsearch.transport.NodeNotConnectedException;
 import org.elasticsearch.transport.Transport;
+import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportResponse;
 import org.elasticsearch.transport.TransportService;
@@ -44,7 +47,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -105,6 +107,7 @@ public class JoinValidationService {
         Settings settings,
         TransportService transportService,
         Supplier<ClusterState> clusterStateSupplier,
+        Supplier<Metadata> metadataSupplier,
         Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators
     ) {
         this.cacheTimeout = JOIN_VALIDATION_CACHE_TIMEOUT_SETTING.get(settings);
@@ -119,14 +122,14 @@ public class JoinValidationService {
             ValidateJoinRequest::new,
             (request, channel, task) -> {
                 final var remoteState = request.getOrReadState();
-                final var localState = clusterStateSupplier.get();
-                if (localState.metadata().clusterUUIDCommitted()
-                    && localState.metadata().clusterUUID().equals(remoteState.metadata().clusterUUID()) == false) {
+                final var remoteMetadata = remoteState.metadata();
+                final var localMetadata = metadataSupplier.get();
+                if (localMetadata.clusterUUIDCommitted() && localMetadata.clusterUUID().equals(remoteMetadata.clusterUUID()) == false) {
                     throw new CoordinationStateRejectedException(
                         "This node previously joined a cluster with UUID ["
-                            + localState.metadata().clusterUUID()
+                            + localMetadata.clusterUUID()
                             + "] and is now trying to join a different cluster with UUID ["
-                            + remoteState.metadata().clusterUUID()
+                            + remoteMetadata.clusterUUID()
                             + "]. This is forbidden and usually indicates an incorrect "
                             + "discovery or cluster bootstrapping configuration. Note that the cluster UUID persists across restarts and "
                             + "can only be changed by deleting the contents of the node's data "
@@ -153,26 +156,39 @@ public class JoinValidationService {
                 listener.onFailure(new NodeClosedException(transportService.getLocalNode()));
             }
         } else {
-            transportService.sendRequest(
-                discoveryNode,
-                JOIN_VALIDATE_ACTION_NAME,
-                new ValidateJoinRequest(clusterStateSupplier.get()),
-                REQUEST_OPTIONS,
-                new ActionListenerResponseHandler<>(listener.delegateResponse((l, e) -> {
-                    logger.warn(() -> "failed to validate incoming join request from node [" + discoveryNode + "]", e);
-                    listener.onFailure(
-                        new IllegalStateException(
-                            String.format(
-                                Locale.ROOT,
-                                "failure when sending a join validation request from [%s] to [%s]",
-                                transportService.getLocalNode().descriptionWithoutAttributes(),
-                                discoveryNode.descriptionWithoutAttributes()
-                            ),
-                            e
-                        )
-                    );
-                }), i -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.CLUSTER_COORDINATION)
-            );
+            final var responseHandler = new ActionListenerResponseHandler<>(listener.delegateResponse((l, e) -> {
+                logger.warn(() -> "failed to validate incoming join request from node [" + discoveryNode + "]", e);
+                listener.onFailure(
+                    new IllegalStateException(
+                        String.format(
+                            Locale.ROOT,
+                            "failure when sending a join validation request from [%s] to [%s]",
+                            transportService.getLocalNode().descriptionWithoutAttributes(),
+                            discoveryNode.descriptionWithoutAttributes()
+                        ),
+                        e
+                    )
+                );
+            }), i -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.CLUSTER_COORDINATION);
+            final var clusterState = clusterStateSupplier.get();
+            if (clusterState != null) {
+                assert clusterState.nodes().isLocalNodeElectedMaster();
+                transportService.sendRequest(
+                    discoveryNode,
+                    JOIN_VALIDATE_ACTION_NAME,
+                    new ValidateJoinRequest(clusterState),
+                    REQUEST_OPTIONS,
+                    responseHandler
+                );
+            } else {
+                transportService.sendRequest(
+                    discoveryNode,
+                    JoinHelper.JOIN_PING_ACTION_NAME,
+                    TransportRequest.Empty.INSTANCE,
+                    REQUEST_OPTIONS,
+                    responseHandler
+                );
+            }
         }
     }
 
@@ -313,7 +329,27 @@ public class JoinValidationService {
             }
             var version = connection.getTransportVersion();
             var cachedBytes = statesByVersion.get(version);
-            var bytes = Objects.requireNonNullElseGet(cachedBytes, () -> serializeClusterState(discoveryNode, version));
+            var bytes = maybeSerializeClusterState(cachedBytes, discoveryNode, version);
+            if (bytes == null) {
+                // Normally if we're not the master then the Coordinator sends a ping message just to validate connectivity instead of
+                // getting here. But if we were the master when the Coordinator checked then we might not be the master any more, so we
+                // get a null and fall back to a ping here too.
+
+                // noinspection ConstantConditions
+                assert cachedBytes == null;
+                transportService.sendRequest(
+                    connection,
+                    JoinHelper.JOIN_PING_ACTION_NAME,
+                    TransportRequest.Empty.INSTANCE,
+                    REQUEST_OPTIONS,
+                    new ActionListenerResponseHandler<>(
+                        listener,
+                        in -> TransportResponse.Empty.INSTANCE,
+                        ThreadPool.Names.CLUSTER_COORDINATION
+                    )
+                );
+                return;
+            }
             assert bytes.hasReferences() : "already closed";
             bytes.incRef();
             transportService.sendRequest(
@@ -328,18 +364,24 @@ public class JoinValidationService {
                     bytes::decRef
                 )
             );
-            if (cachedBytes == null) {
-                transportService.getThreadPool().schedule(new Runnable() {
-                    @Override
-                    public void run() {
-                        execute(cacheClearer);
-                    }
-
-                    @Override
-                    public String toString() {
-                        return cacheClearer + " after timeout";
-                    }
-                }, cacheTimeout, ThreadPool.Names.CLUSTER_COORDINATION);
+            try {
+                if (cachedBytes == null) {
+                    transportService.getThreadPool().schedule(new Runnable() {
+                        @Override
+                        public void run() {
+                            execute(cacheClearer);
+                        }
+
+                        @Override
+                        public String toString() {
+                            return cacheClearer + " after timeout";
+                        }
+                    }, cacheTimeout, ThreadPool.Names.CLUSTER_COORDINATION);
+                }
+            } catch (Exception e) {
+                assert e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown() : e;
+                // we're shutting down, so clear the cache (and handle the shutdown) right away
+                execute(cacheClearer);
             }
         }
 
@@ -349,11 +391,25 @@ public class JoinValidationService {
         }
     }
 
-    private ReleasableBytesReference serializeClusterState(DiscoveryNode discoveryNode, TransportVersion version) {
+    @Nullable // if we are not the master according to the current cluster state
+    private ReleasableBytesReference maybeSerializeClusterState(
+        ReleasableBytesReference cachedBytes,
+        DiscoveryNode discoveryNode,
+        TransportVersion version
+    ) {
+        if (cachedBytes != null) {
+            return cachedBytes;
+        }
+
+        final var clusterState = clusterStateSupplier.get();
+        if (clusterState == null) {
+            return null;
+        }
+        assert clusterState.nodes().isLocalNodeElectedMaster();
+
         final var bytesStream = transportService.newNetworkBytesStream();
         var success = false;
         try {
-            final var clusterState = clusterStateSupplier.get();
             try (
                 var stream = new OutputStreamStreamOutput(
                     CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))

+ 89 - 14
server/src/test/java/org/elasticsearch/cluster/coordination/JoinValidationServiceTests.java

@@ -20,6 +20,7 @@ import org.elasticsearch.cluster.SimpleDiffable;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
@@ -114,7 +115,8 @@ public class JoinValidationServiceTests extends ESTestCase {
                                 @Override
                                 public void doRun() {
                                     handleResponse(requestId, switch (action) {
-                                        case JoinValidationService.JOIN_VALIDATE_ACTION_NAME -> TransportResponse.Empty.INSTANCE;
+                                        case JoinValidationService.JOIN_VALIDATE_ACTION_NAME, JoinHelper.JOIN_PING_ACTION_NAME ->
+                                            TransportResponse.Empty.INSTANCE;
                                         case TransportService.HANDSHAKE_ACTION_NAME -> new TransportService.HandshakeResponse(
                                             Version.CURRENT,
                                             Build.current().hash(),
@@ -144,9 +146,17 @@ public class JoinValidationServiceTests extends ESTestCase {
             );
             releasables.add(transportService);
 
-            final var clusterState = ClusterState.EMPTY_STATE;
+            final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
+                .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
+                .build();
 
-            final var joinValidationService = new JoinValidationService(settings, transportService, () -> clusterState, List.of());
+            final var joinValidationService = new JoinValidationService(
+                settings,
+                transportService,
+                () -> usually() ? clusterState : null,
+                clusterState::metadata,
+                List.of()
+            );
 
             transportService.start();
             releasables.add(() -> {
@@ -186,7 +196,7 @@ public class JoinValidationServiceTests extends ESTestCase {
                         if (validationPermits.tryAcquire()) {
                             joinValidationService.validateJoin(
                                 randomFrom(random, otherNodes),
-                                ActionListener.notifyOnce(new ActionListener<>() {
+                                ActionListener.assertOnce(new ActionListener<>() {
                                     @Override
                                     public void onResponse(TransportResponse.Empty empty) {
                                         validationPermits.release();
@@ -259,7 +269,12 @@ public class JoinValidationServiceTests extends ESTestCase {
         }
 
         final var deterministicTaskQueue = new DeterministicTaskQueue();
-        final var clusterState = ClusterState.builder(ClusterName.DEFAULT).putCustom("test", new BadCustom()).build();
+
+        final var masterNode = DiscoveryNodeUtils.create("node0");
+        final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId()).masterNodeId(masterNode.getId()))
+            .putCustom("test", new BadCustom())
+            .build();
 
         final var joiningNode = DiscoveryNodeUtils.create("joining");
         final var joiningNodeTransport = new MockTransport();
@@ -271,11 +286,13 @@ public class JoinValidationServiceTests extends ESTestCase {
             null,
             Collections.emptySet()
         );
-        new JoinValidationService(Settings.EMPTY, joiningNodeTransportService, () -> clusterState, List.of()); // registers request handler
+
+        // registers request handler
+        new JoinValidationService(Settings.EMPTY, joiningNodeTransportService, () -> clusterState, clusterState::metadata, List.of());
+
         joiningNodeTransportService.start();
         joiningNodeTransportService.acceptIncomingRequests();
 
-        final var masterNode = DiscoveryNodeUtils.create("node0");
         final var masterTransport = new MockTransport() {
             @Override
             protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
@@ -315,7 +332,13 @@ public class JoinValidationServiceTests extends ESTestCase {
             null,
             Collections.emptySet()
         );
-        final var joinValidationService = new JoinValidationService(Settings.EMPTY, masterTransportService, () -> clusterState, List.of());
+        final var joinValidationService = new JoinValidationService(
+            Settings.EMPTY,
+            masterTransportService,
+            () -> clusterState,
+            clusterState::metadata,
+            List.of()
+        );
         masterTransportService.start();
         masterTransportService.acceptIncomingRequests();
 
@@ -356,7 +379,10 @@ public class JoinValidationServiceTests extends ESTestCase {
 
         final var dataPath = "/my/data/path";
         final var settings = Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), dataPath).build();
-        new JoinValidationService(settings, transportService, () -> localClusterState, List.of()); // registers request handler
+
+        // registers request handler
+        new JoinValidationService(settings, transportService, () -> localClusterState, localClusterState::metadata, List.of());
+
         transportService.start();
         transportService.acceptIncomingRequests();
 
@@ -401,11 +427,17 @@ public class JoinValidationServiceTests extends ESTestCase {
         );
 
         final var stateForValidation = ClusterState.builder(ClusterName.DEFAULT).build();
-        new JoinValidationService(Settings.EMPTY, transportService, () -> localClusterState, List.of((node, state) -> {
-            assertSame(node, localNode);
-            assertSame(state, stateForValidation);
-            throw new IllegalStateException("simulated validation failure");
-        })); // registers request handler
+        new JoinValidationService(
+            Settings.EMPTY,
+            transportService,
+            () -> localClusterState,
+            localClusterState::metadata,
+            List.of((node, state) -> {
+                assertSame(node, localNode);
+                assertSame(state, stateForValidation);
+                throw new IllegalStateException("simulated validation failure");
+            })
+        ); // registers request handler
         transportService.start();
         transportService.acceptIncomingRequests();
 
@@ -423,4 +455,47 @@ public class JoinValidationServiceTests extends ESTestCase {
             allOf(containsString("simulated validation failure"))
         );
     }
+
+    public void testJoinValidationFallsBackToPingIfNotMaster() {
+
+        final var deterministicTaskQueue = new DeterministicTaskQueue();
+        final var masterNode = DiscoveryNodeUtils.create("node0");
+        final var joiningNode = DiscoveryNodeUtils.create("joining");
+
+        final var pingSeen = new AtomicBoolean();
+        final var masterTransport = new MockTransport() {
+            @Override
+            protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
+                assertSame(node, joiningNode);
+                assertEquals(JoinHelper.JOIN_PING_ACTION_NAME, action);
+                assertTrue(pingSeen.compareAndSet(false, true));
+                handleResponse(requestId, TransportResponse.Empty.INSTANCE);
+            }
+        };
+        final var masterTransportService = masterTransport.createTransportService(
+            Settings.EMPTY,
+            deterministicTaskQueue.getThreadPool(),
+            TransportService.NOOP_TRANSPORT_INTERCEPTOR,
+            x -> masterNode,
+            null,
+            Collections.emptySet()
+        );
+        final var joinValidationService = new JoinValidationService(Settings.EMPTY, masterTransportService, () -> null, () -> {
+            throw new AssertionError("should not be called");
+        }, List.of());
+        masterTransportService.start();
+        masterTransportService.acceptIncomingRequests();
+
+        try {
+            final var future = new PlainActionFuture<TransportResponse.Empty>();
+            joinValidationService.validateJoin(joiningNode, future);
+            assertFalse(future.isDone());
+            deterministicTaskQueue.runAllTasks();
+            assertTrue(future.isDone());
+            assertTrue(pingSeen.get());
+        } finally {
+            joinValidationService.stop();
+            masterTransportService.close();
+        }
+    }
 }