Browse Source

Preemptively initialize routing nodes and indices lookup on all node types (#89032)

Follow up to #89005 running the initialization as soon as possible on non-master
nodes as well.
Armin Braun 3 years ago
parent
commit
d9dc3a9629

+ 5 - 0
docs/changelog/89032.yaml

@@ -0,0 +1,5 @@
+pr: 89032
+summary: Preemptively initialize routing nodes and indices lookup on all node types
+area: Cluster Coordination
+type: enhancement
+issues: []

+ 14 - 0
server/src/main/java/org/elasticsearch/cluster/ClusterState.java

@@ -54,6 +54,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Objects;
 import java.util.Set;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.Consumer;
 
 
 import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
 import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
@@ -351,6 +352,19 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
         return RoutingNodes.mutable(routingTable, this.nodes);
         return RoutingNodes.mutable(routingTable, this.nodes);
     }
     }
 
 
+    /**
+     * Initialize data structures that lazy computed for this instance in the background by using the giving executor.
+     * @param executor executor to run initialization tasks on
+     */
+    public void initializeAsync(Executor executor) {
+        if (routingNodes == null) {
+            executor.execute(this::getRoutingNodes);
+        }
+        if (metadata.indicesLookupInitialized() == false) {
+            executor.execute(metadata::getIndicesLookup);
+        }
+    }
+
     @Override
     @Override
     public String toString() {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         StringBuilder sb = new StringBuilder();

+ 14 - 11
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -414,8 +414,14 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
         assert publishRequest.getAcceptedState().nodes().getLocalNode().equals(getLocalNode())
         assert publishRequest.getAcceptedState().nodes().getLocalNode().equals(getLocalNode())
             : publishRequest.getAcceptedState().nodes().getLocalNode() + " != " + getLocalNode();
             : publishRequest.getAcceptedState().nodes().getLocalNode() + " != " + getLocalNode();
 
 
+        final ClusterState newClusterState = publishRequest.getAcceptedState();
+        if (newClusterState.nodes().isLocalNodeElectedMaster() == false) {
+            // background initialization on the current master has been started by the master service already
+            newClusterState.initializeAsync(transportService.getThreadPool().generic());
+        }
+
         synchronized (mutex) {
         synchronized (mutex) {
-            final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode();
+            final DiscoveryNode sourceNode = newClusterState.nodes().getMasterNode();
             logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);
             logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);
 
 
             if (sourceNode.equals(getLocalNode()) && mode != Mode.LEADER) {
             if (sourceNode.equals(getLocalNode()) && mode != Mode.LEADER) {
@@ -427,30 +433,30 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
             final ClusterState localState = coordinationState.get().getLastAcceptedState();
             final ClusterState localState = coordinationState.get().getLastAcceptedState();
 
 
             if (localState.metadata().clusterUUIDCommitted()
             if (localState.metadata().clusterUUIDCommitted()
-                && localState.metadata().clusterUUID().equals(publishRequest.getAcceptedState().metadata().clusterUUID()) == false) {
+                && localState.metadata().clusterUUID().equals(newClusterState.metadata().clusterUUID()) == false) {
                 logger.warn(
                 logger.warn(
                     "received cluster state from {} with a different cluster uuid {} than local cluster uuid {}, rejecting",
                     "received cluster state from {} with a different cluster uuid {} than local cluster uuid {}, rejecting",
                     sourceNode,
                     sourceNode,
-                    publishRequest.getAcceptedState().metadata().clusterUUID(),
+                    newClusterState.metadata().clusterUUID(),
                     localState.metadata().clusterUUID()
                     localState.metadata().clusterUUID()
                 );
                 );
                 throw new CoordinationStateRejectedException(
                 throw new CoordinationStateRejectedException(
                     "received cluster state from "
                     "received cluster state from "
                         + sourceNode
                         + sourceNode
                         + " with a different cluster uuid "
                         + " with a different cluster uuid "
-                        + publishRequest.getAcceptedState().metadata().clusterUUID()
+                        + newClusterState.metadata().clusterUUID()
                         + " than local cluster uuid "
                         + " than local cluster uuid "
                         + localState.metadata().clusterUUID()
                         + localState.metadata().clusterUUID()
                         + ", rejecting"
                         + ", rejecting"
                 );
                 );
             }
             }
 
 
-            if (publishRequest.getAcceptedState().term() > localState.term()) {
+            if (newClusterState.term() > localState.term()) {
                 // only do join validation if we have not accepted state from this master yet
                 // only do join validation if we have not accepted state from this master yet
-                onJoinValidators.forEach(a -> a.accept(getLocalNode(), publishRequest.getAcceptedState()));
+                onJoinValidators.forEach(a -> a.accept(getLocalNode(), newClusterState));
             }
             }
 
 
-            ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
+            ensureTermAtLeast(sourceNode, newClusterState.term());
             final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);
             final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);
 
 
             if (sourceNode.equals(getLocalNode())) {
             if (sourceNode.equals(getLocalNode())) {
@@ -459,10 +465,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
                 becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector
                 becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector
             }
             }
 
 
-            return new PublishWithJoinResponse(
-                publishResponse,
-                joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term())
-            );
+            return new PublishWithJoinResponse(publishResponse, joinWithDestination(lastJoin, sourceNode, newClusterState.term()));
         }
         }
     }
     }
 
 

+ 4 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

@@ -509,6 +509,10 @@ public class Metadata extends AbstractCollection<IndexMetadata> implements Diffa
         return true;
         return true;
     }
     }
 
 
+    public boolean indicesLookupInitialized() {
+        return indicesLookup != null;
+    }
+
     public SortedMap<String, IndexAbstraction> getIndicesLookup() {
     public SortedMap<String, IndexAbstraction> getIndicesLookup() {
         SortedMap<String, IndexAbstraction> lookup = indicesLookup;
         SortedMap<String, IndexAbstraction> lookup = indicesLookup;
         if (lookup == null) {
         if (lookup == null) {

+ 1 - 2
server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

@@ -332,8 +332,7 @@ public class MasterService extends AbstractLifecycleComponent {
                     logger.debug("publishing cluster state version [{}]", newClusterState.version());
                     logger.debug("publishing cluster state version [{}]", newClusterState.version());
                     // initialize routing nodes and the indices lookup concurrently, we will need both of them for the cluster state
                     // initialize routing nodes and the indices lookup concurrently, we will need both of them for the cluster state
                     // application and can compute them while we wait for the other nodes during publication
                     // application and can compute them while we wait for the other nodes during publication
-                    threadPool.generic().execute(newClusterState::getRoutingNodes);
-                    threadPool.generic().execute(newClusterState.metadata()::getIndicesLookup);
+                    newClusterState.initializeAsync(threadPool.generic());
                     publish(
                     publish(
                         clusterStatePublicationEvent,
                         clusterStatePublicationEvent,
                         new CompositeTaskAckListener(
                         new CompositeTaskAckListener(

+ 1 - 2
test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java

@@ -142,8 +142,7 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase {
 
 
         runnableTasks.remove(0).run(); // schedule again
         runnableTasks.remove(0).run(); // schedule again
 
 
-        // run tasks for computing routing nodes and indices lookup
-        runnableTasks.remove(0).run();
+        // run task for computing missing indices lookup
         runnableTasks.remove(0).run();
         runnableTasks.remove(0).run();
 
 
         runnableTasks.remove(0).run(); // publish again
         runnableTasks.remove(0).run(); // publish again