浏览代码

Picking master eligible node at random in the master stability health indicator (#89841)

Keith Massey 3 年之前
父节点
当前提交
f596a43173

+ 21 - 2
server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java

@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterApplierService;
 import org.elasticsearch.cluster.service.ClusterApplierService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -50,6 +51,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Objects;
+import java.util.Random;
 import java.util.Set;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
@@ -91,6 +93,9 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
      */
      */
     private final int unacceptableIdentityChanges;
     private final int unacceptableIdentityChanges;
 
 
+    // ThreadLocal because our unit testing framework does not like sharing Randoms across threads
+    private final ThreadLocal<Random> random = ThreadLocal.withInitial(Randomness::get);
+
     /*
     /*
      * This is a Map of tasks that are periodically reaching out to other master eligible nodes to get their ClusterFormationStates for
      * This is a Map of tasks that are periodically reaching out to other master eligible nodes to get their ClusterFormationStates for
      * diagnosis. The key is the DisoveryNode for the master eligible node being polled, and the value is a Cancellable.
      * diagnosis. The key is the DisoveryNode for the master eligible node being polled, and the value is a Cancellable.
@@ -722,6 +727,20 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
         return masterEligibleNodes;
         return masterEligibleNodes;
     }
     }
 
 
+    /**
+     * Returns a random master eligible node, or null if this node does not know about any master eligible nodes
+     * @return A random master eligible node or null
+     */
+    // Non-private for unit testing
+    @Nullable
+    DiscoveryNode getRandomMasterEligibleNode() {
+        Collection<DiscoveryNode> masterEligibleNodes = getMasterEligibleNodes();
+        if (masterEligibleNodes.isEmpty()) {
+            return null;
+        }
+        return masterEligibleNodes.toArray(new DiscoveryNode[0])[random.get().nextInt(masterEligibleNodes.size())];
+    }
+
     /**
     /**
      * This returns true if this node has seen a master node within the last few seconds
      * This returns true if this node has seen a master node within the last few seconds
      * @return true if this node has seen a master node within the last few seconds, false otherwise
      * @return true if this node has seen a master node within the last few seconds, false otherwise
@@ -958,7 +977,7 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
         Consumer<RemoteMasterHealthResult> responseConsumer,
         Consumer<RemoteMasterHealthResult> responseConsumer,
         AtomicReference<Scheduler.Cancellable> cancellableReference
         AtomicReference<Scheduler.Cancellable> cancellableReference
     ) {
     ) {
-        DiscoveryNode masterEligibleNode = getMasterEligibleNodes().stream().findAny().orElse(null);
+        DiscoveryNode masterEligibleNode = getRandomMasterEligibleNode();
         try {
         try {
             cancellableReference.set(
             cancellableReference.set(
                 fetchCoordinationDiagnostics(
                 fetchCoordinationDiagnostics(
@@ -1002,7 +1021,7 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
                  * cancellableReference, so it will not be run again.
                  * cancellableReference, so it will not be run again.
                  */
                  */
                 try {
                 try {
-                    DiscoveryNode masterEligibleNode = getMasterEligibleNodes().stream().findAny().orElse(null);
+                    DiscoveryNode masterEligibleNode = getRandomMasterEligibleNode();
                     cancellableReference.set(
                     cancellableReference.set(
                         fetchCoordinationDiagnostics(
                         fetchCoordinationDiagnostics(
                             masterEligibleNode,
                             masterEligibleNode,

+ 32 - 0
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsServiceTests.java

@@ -36,6 +36,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
@@ -1155,6 +1156,37 @@ public class CoordinationDiagnosticsServiceTests extends AbstractCoordinatorTest
         );
         );
     }
     }
 
 
+    public void testRandomMasterEligibleNode() throws Exception {
+        MasterHistoryService masterHistoryService = createMasterHistoryService();
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.getSettings()).thenReturn(Settings.EMPTY);
+        when(clusterService.state()).thenReturn(nullMasterClusterState);
+        when(clusterService.localNode()).thenReturn(node3);
+        Coordinator coordinator = mock(Coordinator.class);
+        Set<DiscoveryNode> allMasterEligibleNodes = Set.of(node1, node2, node3);
+        when(coordinator.getFoundPeers()).thenReturn(allMasterEligibleNodes);
+        DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue();
+        ThreadPool threadPool = deterministicTaskQueue.getThreadPool();
+        TransportService transportService = mock(TransportService.class);
+        when(transportService.getThreadPool()).thenReturn(threadPool);
+        CoordinationDiagnosticsService coordinationDiagnosticsService = new CoordinationDiagnosticsService(
+            clusterService,
+            transportService,
+            coordinator,
+            masterHistoryService
+        );
+
+        /*
+         * Here we're just checking that with a large number of runs (1000) relative to the number of master eligible nodes (3) we always
+         * get all 3 master eligible nodes back. This is just so that we know we're not always getting back the same node.
+         */
+        Set<DiscoveryNode> seenMasterEligibleNodes = new HashSet<>();
+        for (int i = 0; i < 1000; i++) {
+            seenMasterEligibleNodes.add(coordinationDiagnosticsService.getRandomMasterEligibleNode());
+        }
+        assertThat(seenMasterEligibleNodes, equalTo(allMasterEligibleNodes));
+    }
+
     public void testResultSerialization() {
     public void testResultSerialization() {
         CoordinationDiagnosticsService.CoordinationDiagnosticsStatus status = getRandomStatus();
         CoordinationDiagnosticsService.CoordinationDiagnosticsStatus status = getRandomStatus();
         CoordinationDiagnosticsService.CoordinationDiagnosticsDetails details = getRandomDetails();
         CoordinationDiagnosticsService.CoordinationDiagnosticsDetails details = getRandomDetails();