Bladeren bron

Preemptively compute RoutingNodes and the indices lookup during publication (#89005)

Computing routing nodes and the indices lookup takes considerable time
for large states. Both are needed during cluster state application and
Prior to this change would be computed on the applier thread in all cases.
By running the creation of both objects concurrently to publication, the
many shards benchmark sees a 10%+ reduction in the bootstrap time to
50k indices.
Armin Braun 3 jaren geleden
bovenliggende
commit
9bed4b89fd

+ 5 - 0
docs/changelog/89005.yaml

@@ -0,0 +1,5 @@
+pr: 89005
+summary: Preemptively compute `RoutingNodes` and the indices lookup during publication
+area: Cluster Coordination
+type: enhancement
+issues: []

+ 15 - 4
server/src/main/java/org/elasticsearch/cluster/ClusterState.java

@@ -319,11 +319,22 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
      * Returns a built (on demand) routing nodes view of the routing table.
      */
     public RoutingNodes getRoutingNodes() {
-        if (routingNodes != null) {
-            return routingNodes;
+        RoutingNodes r = routingNodes;
+        if (r != null) {
+            return r;
         }
-        routingNodes = RoutingNodes.immutable(routingTable, nodes);
-        return routingNodes;
+        r = buildRoutingNodes();
+        return r;
+    }
+
+    private synchronized RoutingNodes buildRoutingNodes() {
+        RoutingNodes r = routingNodes;
+        if (r != null) {
+            return r;
+        }
+        r = RoutingNodes.immutable(routingTable, nodes);
+        routingNodes = r;
+        return r;
     }
 
     /**

+ 15 - 4
server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

@@ -222,7 +222,7 @@ public class Metadata extends AbstractCollection<IndexMetadata> implements Diffa
     private final String[] allClosedIndices;
     private final String[] visibleClosedIndices;
 
-    private SortedMap<String, IndexAbstraction> indicesLookup;
+    private volatile SortedMap<String, IndexAbstraction> indicesLookup;
     private final Map<String, MappingMetadata> mappingsByHash;
 
     private final Version oldestIndexVersion;
@@ -510,10 +510,21 @@ public class Metadata extends AbstractCollection<IndexMetadata> implements Diffa
     }
 
     public SortedMap<String, IndexAbstraction> getIndicesLookup() {
-        if (indicesLookup == null) {
-            indicesLookup = Builder.buildIndicesLookup(custom(DataStreamMetadata.TYPE, DataStreamMetadata.EMPTY), indices);
+        SortedMap<String, IndexAbstraction> lookup = indicesLookup;
+        if (lookup == null) {
+            lookup = buildIndicesLookup();
         }
-        return indicesLookup;
+        return lookup;
+    }
+
+    private synchronized SortedMap<String, IndexAbstraction> buildIndicesLookup() {
+        SortedMap<String, IndexAbstraction> i = indicesLookup;
+        if (i != null) {
+            return i;
+        }
+        i = Builder.buildIndicesLookup(custom(DataStreamMetadata.TYPE, DataStreamMetadata.EMPTY), indices);
+        indicesLookup = i;
+        return i;
     }
 
     public boolean sameIndicesLookup(Metadata other) {

+ 4 - 0
server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

@@ -330,6 +330,10 @@ public class MasterService extends AbstractLifecycleComponent {
                     }
 
                     logger.debug("publishing cluster state version [{}]", newClusterState.version());
+                    // initialize routing nodes and the indices lookup concurrently, we will need both of them for the cluster state
+                    // application and can compute them while we wait for the other nodes during publication
+                    threadPool.generic().execute(newClusterState::getRoutingNodes);
+                    threadPool.generic().execute(newClusterState.metadata()::getIndicesLookup);
                     publish(
                         clusterStatePublicationEvent,
                         new CompositeTaskAckListener(

+ 9 - 0
test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java

@@ -100,6 +100,10 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase {
         assertThat(scheduleTask, hasToString("master service scheduling next task"));
         scheduleTask.run();
 
+        // run tasks for computing routing nodes and indices lookup
+        runnableTasks.remove(0).run();
+        runnableTasks.remove(0).run();
+
         final Runnable publishTask = runnableTasks.remove(0);
         assertThat(publishTask, hasToString(containsString("publish change of cluster state")));
         publishTask.run();
@@ -137,6 +141,11 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase {
         assertThat(runnableTasks.size(), equalTo(1)); // check that new task gets queued
 
         runnableTasks.remove(0).run(); // schedule again
+
+        // run tasks for computing routing nodes and indices lookup
+        runnableTasks.remove(0).run();
+        runnableTasks.remove(0).run();
+
         runnableTasks.remove(0).run(); // publish again
         assertThat(lastClusterStateRef.get().metadata().indices().size(), equalTo(2));
         assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion + 2));