Browse Source

Defer reroute when nodes join (#42855)

Today the master eagerly reroutes the cluster as part of processing node joins.
However, it is not necessary to do this reroute straight away, and it is
sometimes preferable to defer it until later. For instance, when the master
wins its election it processes joins and performs a reroute, but it would be
better to defer the reroute until after the master has become properly
established.

This change defers this reroute into a separate task, and batches multiple such
tasks together.
David Turner 6 years ago
parent
commit
ddedf80c06

+ 0 - 2
server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

@@ -33,7 +33,6 @@ import org.elasticsearch.cluster.metadata.MetaDataMappingService;
 import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
 import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
 import org.elasticsearch.cluster.routing.DelayedAllocationService;
-import org.elasticsearch.cluster.routing.RoutingService;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
@@ -242,7 +241,6 @@ public class ClusterModule extends AbstractModule {
         bind(MetaDataUpdateSettingsService.class).asEagerSingleton();
         bind(MetaDataIndexTemplateService.class).asEagerSingleton();
         bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver);
-        bind(RoutingService.class).asEagerSingleton();
         bind(DelayedAllocationService.class).asEagerSingleton();
         bind(ShardStateAction.class).asEagerSingleton();
         bind(NodeMappingRefreshAction.class).asEagerSingleton();

+ 11 - 2
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -79,6 +79,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -138,10 +139,17 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
     private JoinHelper.JoinAccumulator joinAccumulator;
     private Optional<CoordinatorPublication> currentPublication = Optional.empty();
 
+    /**
+     * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
+     * @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster.
+     * @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In
+     *                production code this calls {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String)}.
+     */
     public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
                        NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
                        Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
-                       ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random) {
+                       ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
+                       Consumer<String> reroute) {
         this.settings = settings;
         this.transportService = transportService;
         this.masterService = masterService;
@@ -149,7 +157,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
         this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
         this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings));
         this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
-            this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
+            this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
+            reroute);
         this.persistedStateSupplier = persistedStateSupplier;
         this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);
         this.lastKnownLeader = Optional.empty();

+ 3 - 2
server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

@@ -62,6 +62,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
@@ -91,11 +92,11 @@ public class JoinHelper {
     JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
                TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
                BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
-               Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
+               Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, Consumer<String> reroute) {
         this.masterService = masterService;
         this.transportService = transportService;
         this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
-        this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) {
+        this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, reroute) {
 
             @Override
             public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)

+ 6 - 3
server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java

@@ -36,6 +36,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
 
@@ -44,6 +45,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
     private final AllocationService allocationService;
 
     private final Logger logger;
+    private final Consumer<String> reroute;
 
     public static class Task {
 
@@ -80,9 +82,10 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
         private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
     }
 
-    public JoinTaskExecutor(AllocationService allocationService, Logger logger) {
+    public JoinTaskExecutor(AllocationService allocationService, Logger logger, Consumer<String> reroute) {
         this.allocationService = allocationService;
         this.logger = logger;
+        this.reroute = reroute;
     }
 
     @Override
@@ -146,8 +149,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
             results.success(joinTask);
         }
         if (nodesChanged) {
-            newState.nodes(nodesBuilder);
-            return results.build(allocationService.reroute(newState.build(), "node_join"));
+            reroute.accept("post-join reroute");
+            return results.build(allocationService.adaptAutoExpandReplicas(newState.nodes(nodesBuilder).build()));
         } else {
             // we must return a new cluster state instance to force publishing. This is important
             // for the joining node to finalize its join and set us as a master

+ 1 - 1
server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

@@ -240,7 +240,7 @@ public class AllocationService {
      * Checks if the are replicas with the auto-expand feature that need to be adapted.
      * Returns an updated cluster state if changes were necessary, or the identical cluster if no changes were required.
      */
-    private ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
+    public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
         final Map<Integer, List<String>> autoExpandReplicaChanges =
             AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), clusterState.nodes());
         if (autoExpandReplicaChanges.isEmpty()) {

+ 4 - 2
server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

@@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.Coordinator;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.RoutingService;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterApplier;
 import org.elasticsearch.cluster.service.ClusterApplierService;
@@ -79,7 +80,8 @@ public class DiscoveryModule {
     public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService,
                            NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
                            ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
-                           AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState) {
+                           AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState,
+                           RoutingService routingService) {
         final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
         final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
         hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService));
@@ -129,7 +131,7 @@ public class DiscoveryModule {
                 settings, clusterSettings,
                 transportService, namedWriteableRegistry, allocationService, masterService,
                 () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
-                clusterApplier, joinValidators, new Random(Randomness.get().nextLong()));
+                clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), routingService::reroute);
         } else {
             throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
         }

+ 3 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -495,10 +495,11 @@ public class Node implements Closeable {
             RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
                 metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings());
 
+            final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService());
             final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry,
                 networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
                 clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
-                clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState);
+                clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, routingService);
             this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
                 transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
                 httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
@@ -573,6 +574,7 @@ public class Node implements Closeable {
                     b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
                     b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus);
                     b.bind(RestoreService.class).toInstance(restoreService);
+                    b.bind(RoutingService.class).toInstance(routingService);
                 }
             );
             injector = modules.createInjector();

+ 45 - 9
server/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java

@@ -20,9 +20,12 @@
 package org.elasticsearch.cluster;
 
 import org.elasticsearch.action.UnavailableShardsException;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentType;
@@ -35,13 +38,16 @@ import static org.elasticsearch.client.Requests.createIndexRequest;
 import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
 import static org.hamcrest.Matchers.equalTo;
 
-@ClusterScope(scope= Scope.TEST, numDataNodes =0)
+@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
 public class SimpleDataNodesIT extends ESIntegTestCase {
-    public void testDataNodes() throws Exception {
+
+    private static final String SOURCE = "{\"type1\":{\"id\":\"1\",\"name\":\"test\"}}";
+
+    public void testIndexingBeforeAndAfterDataNodesStart() {
         internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
         client().admin().indices().create(createIndexRequest("test").waitForActiveShards(ActiveShardCount.NONE)).actionGet();
         try {
-            client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"), XContentType.JSON)
+            client().index(Requests.indexRequest("test").id("1").source(SOURCE, XContentType.JSON)
                 .timeout(timeValueSeconds(1))).actionGet();
             fail("no allocation should happen");
         } catch (UnavailableShardsException e) {
@@ -54,7 +60,7 @@ public class SimpleDataNodesIT extends ESIntegTestCase {
 
         // still no shard should be allocated
         try {
-            client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"), XContentType.JSON)
+            client().index(Requests.indexRequest("test").id("1").source(SOURCE, XContentType.JSON)
                 .timeout(timeValueSeconds(1))).actionGet();
             fail("no allocation should happen");
         } catch (UnavailableShardsException e) {
@@ -66,13 +72,43 @@ public class SimpleDataNodesIT extends ESIntegTestCase {
         assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3")
             .setLocal(true).execute().actionGet().isTimedOut(), equalTo(false));
 
-        IndexResponse indexResponse = client().index(Requests.indexRequest("test").type("type1").id("1")
-            .source(source("1", "test"), XContentType.JSON)).actionGet();
+        IndexResponse indexResponse = client().index(Requests.indexRequest("test").id("1")
+            .source(SOURCE, XContentType.JSON)).actionGet();
         assertThat(indexResponse.getId(), equalTo("1"));
-        assertThat(indexResponse.getType(), equalTo("type1"));
     }
 
-    private String source(String id, String nameValue) {
-        return "{ \"type1\" : { \"id\" : \"" + id + "\", \"name\" : \"" + nameValue + "\" } }";
+    public void testShardsAllocatedAfterDataNodesStart() {
+        internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
+        client().admin().indices().create(createIndexRequest("test")
+            .settings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).waitForActiveShards(ActiveShardCount.NONE))
+            .actionGet();
+        final ClusterHealthResponse healthResponse1 = client().admin().cluster().prepareHealth()
+            .setWaitForEvents(Priority.LANGUID).execute().actionGet();
+        assertThat(healthResponse1.isTimedOut(), equalTo(false));
+        assertThat(healthResponse1.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); // TODO should be RED, see #41073
+        assertThat(healthResponse1.getActiveShards(), equalTo(0));
+
+        internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).build());
+
+        assertThat(client().admin().cluster().prepareHealth()
+            .setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet().isTimedOut(),
+            equalTo(false));
+    }
+
+    public void testAutoExpandReplicasAdjustedWhenDataNodeJoins() {
+        internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
+        client().admin().indices().create(createIndexRequest("test")
+            .settings(Settings.builder().put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all"))
+            .waitForActiveShards(ActiveShardCount.NONE))
+            .actionGet();
+        final ClusterHealthResponse healthResponse1 = client().admin().cluster().prepareHealth()
+            .setWaitForEvents(Priority.LANGUID).execute().actionGet();
+        assertThat(healthResponse1.isTimedOut(), equalTo(false));
+        assertThat(healthResponse1.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); // TODO should be RED, see #41073
+        assertThat(healthResponse1.getActiveShards(), equalTo(0));
+
+        internalCluster().startNode();
+        internalCluster().startNode();
+        client().admin().cluster().prepareReroute().setRetryFailed(true).get();
     }
 }

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

@@ -1923,7 +1923,7 @@ public class CoordinatorTests extends ESTestCase {
                 final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
                 coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
                     allocationService, masterService, this::getPersistedState,
-                    Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get());
+                    Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), s -> {});
                 masterService.setClusterStatePublisher(coordinator);
                 final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
                     deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator);

+ 2 - 2
server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java

@@ -57,7 +57,7 @@ public class JoinHelperTests extends ESTestCase {
             x -> localNode, null, Collections.emptySet());
         JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
             (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
-            Collections.emptyList());
+            Collections.emptyList(), s -> {});
         transportService.start();
 
         DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
@@ -153,7 +153,7 @@ public class JoinHelperTests extends ESTestCase {
             x -> localNode, null, Collections.emptySet());
         new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState,
             (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
-            Collections.emptyList()); // registers request handler
+            Collections.emptyList(), s -> {}); // registers request handler
         transportService.start();
         transportService.acceptIncomingRequests();
 

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

@@ -176,7 +176,7 @@ public class NodeJoinTests extends ESTestCase {
             () -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
             new NoOpClusterApplier(),
             Collections.emptyList(),
-            random);
+            random, s -> {});
         transportService.start();
         transportService.acceptIncomingRequests();
         transport = capturingTransport;

+ 3 - 1
server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.Coordinator;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.RoutingService;
 import org.elasticsearch.cluster.service.ClusterApplier;
 import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -90,7 +91,8 @@ public class DiscoveryModuleTests extends ESTestCase {
 
     private DiscoveryModule newModule(Settings settings, List<DiscoveryPlugin> plugins) {
         return new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, null, masterService,
-            clusterApplier, clusterSettings, plugins, null, createTempDir().toAbsolutePath(), gatewayMetaState);
+            clusterApplier, clusterSettings, plugins, null, createTempDir().toAbsolutePath(), gatewayMetaState,
+            mock(RoutingService.class));
     }
 
     public void testDefaults() {

+ 1 - 1
server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java

@@ -213,7 +213,7 @@ public class ClusterStateChanges {
             transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver);
 
         nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
-        joinTaskExecutor = new JoinTaskExecutor(allocationService, logger);
+        joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, s -> {});
     }
 
     public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {

+ 2 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -1244,7 +1244,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 allocationService, masterService, () -> persistedState,
                 hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
                     .map(n -> n.node.getAddress()).collect(Collectors.toList()),
-                clusterService.getClusterApplierService(), Collections.emptyList(), random());
+                clusterService.getClusterApplierService(), Collections.emptyList(), random(),
+                new RoutingService(clusterService, allocationService)::reroute);
             masterService.setClusterStatePublisher(coordinator);
             coordinator.start();
             masterService.start();