瀏覽代碼

Rename and refactor RoutingService (#43827)

The `RoutingService` has a confusing name, since it doesn't really have
anything to do with routing. Its responsibility is submitting reroute commands
to the master.

This commit renames this class to `BatchedRerouteService`, and extracts the
`RerouteService` interface to avoid passing `BiConsumer`s everywhere. It also
removes that `BatchedRerouteService extends AbstractLifecycleComponent` since
this service has no meaningful lifecycle. Finally, it introduces a small
wrapper class to allow for lazy initialization to deal with the dependency loop
when constructing a `Node`.
David Turner 6 年之前
父節點
當前提交
de693a2902
共有 16 個文件被更改,包括 157 次插入146 次删除
  1. 7 7
      server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
  2. 3 5
      server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
  3. 3 2
      server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java
  4. 5 4
      server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java
  5. 10 34
      server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java
  6. 42 0
      server/src/main/java/org/elasticsearch/cluster/routing/LazilyInitializedRerouteService.java
  7. 29 0
      server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java
  8. 12 19
      server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java
  9. 3 3
      server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java
  10. 6 6
      server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java
  11. 10 10
      server/src/main/java/org/elasticsearch/node/Node.java
  12. 3 3
      server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java
  13. 7 31
      server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java
  14. 12 17
      server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java
  15. 2 2
      server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java
  16. 3 3
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

+ 7 - 7
server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

@@ -36,7 +36,7 @@ import org.elasticsearch.cluster.NotMasterException;
 import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.routing.RoutingService;
+import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.routing.allocation.FailedShard;
@@ -89,7 +89,7 @@ public class ShardStateAction {
 
     @Inject
     public ShardStateAction(ClusterService clusterService, TransportService transportService,
-                            AllocationService allocationService, RoutingService routingService, ThreadPool threadPool) {
+                            AllocationService allocationService, RerouteService rerouteService, ThreadPool threadPool) {
         this.transportService = transportService;
         this.clusterService = clusterService;
         this.threadPool = threadPool;
@@ -98,7 +98,7 @@ public class ShardStateAction {
             new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
         transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ThreadPool.Names.SAME, FailedShardEntry::new,
             new ShardFailedTransportHandler(clusterService,
-                new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
+                new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, logger), logger));
     }
 
     private void sendShardAction(final String actionName, final ClusterState currentState,
@@ -280,12 +280,12 @@ public class ShardStateAction {
 
     public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor<FailedShardEntry> {
         private final AllocationService allocationService;
-        private final RoutingService routingService;
+        private final RerouteService rerouteService;
         private final Logger logger;
 
-        public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, Logger logger) {
+        public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, Logger logger) {
             this.allocationService = allocationService;
-            this.routingService = routingService;
+            this.rerouteService = rerouteService;
             this.logger = logger;
         }
 
@@ -379,7 +379,7 @@ public class ShardStateAction {
                 if (logger.isTraceEnabled()) {
                     logger.trace("{}, scheduling a reroute", reason);
                 }
-                routingService.reroute(reason, ActionListener.wrap(
+                rerouteService.reroute(reason, ActionListener.wrap(
                     r -> logger.trace("{}, reroute completed", reason),
                     e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
             }

+ 3 - 5
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -39,6 +39,7 @@ import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterApplier;
 import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
@@ -144,15 +145,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
     /**
      * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
      * @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster.
-     * @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In
-     *                production code this calls
-     *                {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String, ActionListener)}.
      */
     public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
                        NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
                        Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
                        ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
-                       BiConsumer<String, ActionListener<Void>> reroute, ElectionStrategy electionStrategy) {
+                       RerouteService rerouteService, ElectionStrategy electionStrategy) {
         this.settings = settings;
         this.transportService = transportService;
         this.masterService = masterService;
@@ -162,7 +160,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
         this.electionStrategy = electionStrategy;
         this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
             this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
-            reroute);
+            rerouteService);
         this.persistedStateSupplier = persistedStateSupplier;
         this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);
         this.lastKnownLeader = Optional.empty();

+ 3 - 2
server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

@@ -30,6 +30,7 @@ import org.elasticsearch.cluster.NotMasterException;
 import org.elasticsearch.cluster.coordination.Coordinator.Mode;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.common.Priority;
@@ -91,11 +92,11 @@ public class JoinHelper {
     JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
                TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
                BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
-               Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, BiConsumer<String, ActionListener<Void>> reroute) {
+               Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService) {
         this.masterService = masterService;
         this.transportService = transportService;
         this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
-        this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, reroute) {
+        this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, rerouteService) {
 
             @Override
             public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)

+ 5 - 4
server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java

@@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 
@@ -45,7 +46,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
     private final AllocationService allocationService;
 
     private final Logger logger;
-    private final BiConsumer<String, ActionListener<Void>> reroute;
+    private final RerouteService rerouteService;
 
     public static class Task {
 
@@ -82,10 +83,10 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
         private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
     }
 
-    public JoinTaskExecutor(AllocationService allocationService, Logger logger, BiConsumer<String, ActionListener<Void>> reroute) {
+    public JoinTaskExecutor(AllocationService allocationService, Logger logger, RerouteService rerouteService) {
         this.allocationService = allocationService;
         this.logger = logger;
-        this.reroute = reroute;
+        this.rerouteService = rerouteService;
     }
 
     @Override
@@ -149,7 +150,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
             results.success(joinTask);
         }
         if (nodesChanged) {
-            reroute.accept("post-join reroute", ActionListener.wrap(
+            rerouteService.reroute("post-join reroute", ActionListener.wrap(
                 r -> logger.trace("post-join reroute completed"),
                 e -> logger.debug("post-join reroute failed", e)));
 

+ 10 - 34
server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java → server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java

@@ -25,32 +25,22 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.PlainListenableActionFuture;
-import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.NotMasterException;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.inject.Inject;
 
 import java.util.function.BiFunction;
 
 /**
- * A {@link RoutingService} listens to clusters state. When this service
- * receives a {@link ClusterChangedEvent} the cluster state will be verified and
- * the routing tables might be updated.
- * <p>
- * Note: The {@link RoutingService} is responsible for cluster wide operations
- * that include modifications to the cluster state. Such an operation can only
- * be performed on the clusters master node. Unless the local node this service
- * is running on is the clusters master node this service will not perform any
- * actions.
- * </p>
+ * A {@link BatchedRerouteService} is a {@link RerouteService} that batches together reroute requests to avoid unnecessary extra reroutes.
+ * This component only does meaningful work on the elected master node. Reroute requests will fail with a {@link NotMasterException} on
+ * other nodes.
  */
-public class RoutingService extends AbstractLifecycleComponent {
-    private static final Logger logger = LogManager.getLogger(RoutingService.class);
+public class BatchedRerouteService implements RerouteService {
+    private static final Logger logger = LogManager.getLogger(BatchedRerouteService.class);
 
     private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";
 
@@ -61,33 +51,19 @@ public class RoutingService extends AbstractLifecycleComponent {
     @Nullable // null if no reroute is currently pending
     private PlainListenableActionFuture<Void> pendingRerouteListeners;
 
-    @Inject
-    public RoutingService(ClusterService clusterService, BiFunction<ClusterState, String, ClusterState> reroute) {
+    /**
+     * @param reroute Function that computes the updated cluster state after it has been rerouted.
+     */
+    public BatchedRerouteService(ClusterService clusterService, BiFunction<ClusterState, String, ClusterState> reroute) {
         this.clusterService = clusterService;
         this.reroute = reroute;
     }
 
-    @Override
-    protected void doStart() {
-    }
-
-    @Override
-    protected void doStop() {
-    }
-
-    @Override
-    protected void doClose() {
-    }
-
     /**
      * Initiates a reroute.
      */
+    @Override
     public final void reroute(String reason, ActionListener<Void> listener) {
-        if (lifecycle.started() == false) {
-            listener.onFailure(new IllegalStateException(
-                "rejecting delayed reroute [" + reason + "] in state [" + lifecycleState() + "]"));
-            return;
-        }
         final PlainListenableActionFuture<Void> currentListeners;
         synchronized (mutex) {
             if (pendingRerouteListeners != null) {

+ 42 - 0
server/src/main/java/org/elasticsearch/cluster/routing/LazilyInitializedRerouteService.java

@@ -0,0 +1,42 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.routing;
+
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.action.ActionListener;
+
+/**
+ * A {@link RerouteService} that can be initialized lazily. The real reroute service, {@link BatchedRerouteService}, depends on components
+ * constructed quite late in the construction of the node, but other components constructed earlier eventually need access to the reroute
+ * service too.
+ */
+public class LazilyInitializedRerouteService implements RerouteService {
+
+    private final SetOnce<RerouteService> delegate = new SetOnce<>();
+
+    @Override
+    public void reroute(String reason, ActionListener<Void> listener) {
+        assert delegate.get() != null;
+        delegate.get().reroute(reason, listener);
+    }
+
+    public void setRerouteService(RerouteService rerouteService) {
+        delegate.set(rerouteService);
+    }
+}

+ 29 - 0
server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java

@@ -0,0 +1,29 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.routing;
+
+import org.elasticsearch.action.ActionListener;
+
+/**
+ * Asynchronously performs a cluster reroute, updating any shard states and rebalancing the cluster if appropriate.
+ */
+@FunctionalInterface
+public interface RerouteService {
+    void reroute(String reason, ActionListener<Void> listener);
+}

+ 12 - 19
server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

@@ -19,20 +19,10 @@
 
 package org.elasticsearch.cluster.routing.allocation;
 
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.LongSupplier;
-import java.util.function.Supplier;
-
 import com.carrotsearch.hppc.ObjectLookupContainer;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.client.Client;
@@ -41,6 +31,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.DiskUsage;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.Strings;
@@ -49,6 +40,13 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.set.Sets;
 
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
 /**
  * Listens for a node to go over the high watermark and kicks off an empty
  * reroute if it does. Also responsible for logging about nodes that have
@@ -63,14 +61,15 @@ public class DiskThresholdMonitor {
     private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
     private final Supplier<ClusterState> clusterStateSupplier;
     private final LongSupplier currentTimeMillisSupplier;
+    private final RerouteService rerouteService;
     private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE);
     private final AtomicBoolean checkInProgress = new AtomicBoolean();
-    private final SetOnce<Consumer<ActionListener<Void>>> rerouteAction = new SetOnce<>();
 
     public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
-                                Client client, LongSupplier currentTimeMillisSupplier) {
+                                Client client, LongSupplier currentTimeMillisSupplier, RerouteService rerouteService) {
         this.clusterStateSupplier = clusterStateSupplier;
         this.currentTimeMillisSupplier = currentTimeMillisSupplier;
+        this.rerouteService = rerouteService;
         this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
         this.client = client;
     }
@@ -111,8 +110,6 @@ public class DiskThresholdMonitor {
 
     public void onNewInfo(ClusterInfo info) {
 
-        assert rerouteAction.get() != null;
-
         if (checkInProgress.compareAndSet(false, true) == false) {
             logger.info("skipping monitor as a check is already in progress");
             return;
@@ -188,7 +185,7 @@ public class DiskThresholdMonitor {
 
         if (reroute) {
             logger.info("rerouting shards: [{}]", explanation);
-            rerouteAction.get().accept(ActionListener.wrap(r -> {
+            rerouteService.reroute("disk threshold monitor", ActionListener.wrap(r -> {
                 setLastRunTimeMillis();
                 listener.onResponse(r);
             }, e -> {
@@ -225,8 +222,4 @@ public class DiskThresholdMonitor {
             .setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build())
             .execute(ActionListener.map(listener, r -> null));
     }
-
-    public void setRerouteAction(BiConsumer<String, ActionListener<Void>> rerouteAction) {
-        this.rerouteAction.set(listener -> rerouteAction.accept("disk threshold monitor", listener));
-    }
 }

+ 3 - 3
server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

@@ -25,7 +25,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.Coordinator;
 import org.elasticsearch.cluster.coordination.ElectionStrategy;
 import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.routing.RoutingService;
+import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterApplier;
 import org.elasticsearch.cluster.service.ClusterApplierService;
@@ -87,7 +87,7 @@ public class DiscoveryModule {
                            NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
                            ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
                            AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState,
-                           RoutingService routingService) {
+                           RerouteService rerouteService) {
         final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
         final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
         hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService));
@@ -149,7 +149,7 @@ public class DiscoveryModule {
                 settings, clusterSettings,
                 transportService, namedWriteableRegistry, allocationService, masterService,
                 () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
-                clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), routingService::reroute, electionStrategy);
+                clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, electionStrategy);
         } else {
             throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
         }

+ 6 - 6
server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java

@@ -25,8 +25,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.nodes.BaseNodeResponse;
 import org.elasticsearch.action.support.nodes.BaseNodesResponse;
+import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.RoutingNodes;
-import org.elasticsearch.cluster.routing.RoutingService;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
 import org.elasticsearch.cluster.routing.allocation.FailedShard;
@@ -44,7 +44,7 @@ public class GatewayAllocator {
 
     private static final Logger logger = LogManager.getLogger(GatewayAllocator.class);
 
-    private final RoutingService routingService;
+    private final RerouteService rerouteService;
 
     private final PrimaryShardAllocator primaryShardAllocator;
     private final ReplicaShardAllocator replicaShardAllocator;
@@ -55,10 +55,10 @@ public class GatewayAllocator {
         asyncFetchStore = ConcurrentCollections.newConcurrentMap();
 
     @Inject
-    public GatewayAllocator(RoutingService routingService,
+    public GatewayAllocator(RerouteService rerouteService,
                             TransportNodesListGatewayStartedShards startedAction,
                             TransportNodesListShardStoreMetaData storeAction) {
-        this.routingService = routingService;
+        this.rerouteService = rerouteService;
         this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
         this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
     }
@@ -72,7 +72,7 @@ public class GatewayAllocator {
 
     // for tests
     protected GatewayAllocator() {
-        this.routingService = null;
+        this.rerouteService = null;
         this.primaryShardAllocator = null;
         this.replicaShardAllocator = null;
     }
@@ -139,7 +139,7 @@ public class GatewayAllocator {
         @Override
         protected void reroute(ShardId shardId, String reason) {
             logger.trace("{} scheduling reroute for {}", shardId, reason);
-            routingService.reroute("async_shard_fetch", ActionListener.wrap(
+            rerouteService.reroute("async_shard_fetch", ActionListener.wrap(
                 r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason),
                 e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e)));
         }

+ 10 - 10
server/src/main/java/org/elasticsearch/node/Node.java

@@ -56,7 +56,9 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
 import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
-import org.elasticsearch.cluster.routing.RoutingService;
+import org.elasticsearch.cluster.routing.BatchedRerouteService;
+import org.elasticsearch.cluster.routing.LazilyInitializedRerouteService;
+import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.StopWatch;
@@ -369,8 +371,9 @@ public class Node implements Closeable {
                             .newHashPublisher());
             final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
                 scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
+            final LazilyInitializedRerouteService lazilyInitializedRerouteService = new LazilyInitializedRerouteService();
             final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
-                clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis);
+                clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, lazilyInitializedRerouteService);
             final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
                 diskThresholdMonitor::onNewInfo);
             final UsageService usageService = new UsageService();
@@ -503,16 +506,17 @@ public class Node implements Closeable {
             RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
                 metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings());
 
-            final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService()::reroute);
+            final RerouteService rerouteService
+                = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
+            lazilyInitializedRerouteService.setRerouteService(rerouteService);
             final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry,
                 networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
                 clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
-                clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, routingService);
+                clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService);
             this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
                 transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
                 httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
                 searchTransportService);
-            diskThresholdMonitor.setRerouteAction(routingService::reroute);
 
             final SearchService searchService = newSearchService(clusterService, indicesService,
                 threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
@@ -583,7 +587,7 @@ public class Node implements Closeable {
                     b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
                     b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus);
                     b.bind(RestoreService.class).toInstance(restoreService);
-                    b.bind(RoutingService.class).toInstance(routingService);
+                    b.bind(RerouteService.class).toInstance(rerouteService);
                 }
             );
             injector = modules.createInjector();
@@ -670,7 +674,6 @@ public class Node implements Closeable {
         injector.getInstance(IndicesClusterStateService.class).start();
         injector.getInstance(SnapshotsService.class).start();
         injector.getInstance(SnapshotShardsService.class).start();
-        injector.getInstance(RoutingService.class).start();
         injector.getInstance(SearchService.class).start();
         nodeService.getMonitorService().start();
 
@@ -788,7 +791,6 @@ public class Node implements Closeable {
         // This can confuse other nodes and delay things - mostly if we're the master and we're running tests.
         injector.getInstance(Discovery.class).stop();
         // we close indices first, so operations won't be allowed on it
-        injector.getInstance(RoutingService.class).stop();
         injector.getInstance(ClusterService.class).stop();
         injector.getInstance(NodeConnectionsService.class).stop();
         nodeService.getMonitorService().stop();
@@ -838,8 +840,6 @@ public class Node implements Closeable {
         toClose.add(injector.getInstance(IndicesService.class));
         // close filter/fielddata caches after indices
         toClose.add(injector.getInstance(IndicesStore.class));
-        toClose.add(() -> stopWatch.stop().start("routing"));
-        toClose.add(injector.getInstance(RoutingService.class));
         toClose.add(() -> stopWatch.stop().start("cluster"));
         toClose.add(injector.getInstance(ClusterService.class));
         toClose.add(() -> stopWatch.stop().start("node_connections_service"));

+ 3 - 3
server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java

@@ -32,7 +32,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry
 import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
-import org.elasticsearch.cluster.routing.RoutingService;
+import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardsIterator;
@@ -91,8 +91,8 @@ public class ShardStateActionTests extends ESTestCase {
 
     private static class TestShardStateAction extends ShardStateAction {
         TestShardStateAction(ClusterService clusterService, TransportService transportService,
-                             AllocationService allocationService, RoutingService routingService) {
-            super(clusterService, transportService, allocationService, routingService, THREAD_POOL);
+                             AllocationService allocationService, RerouteService rerouteService) {
+            super(clusterService, transportService, allocationService, rerouteService, THREAD_POOL);
         }
 
         private Runnable onBeforeWaitForNewMasterAndRetry;

+ 7 - 31
server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java → server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java

@@ -20,7 +20,6 @@ package org.elasticsearch.cluster.routing;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
@@ -41,9 +40,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.Matchers.startsWith;
 
-public class RoutingServiceTests extends ESTestCase {
+public class BatchedRerouteServiceTests extends ESTestCase {
 
     private ThreadPool threadPool;
     private ClusterService clusterService;
@@ -60,38 +58,19 @@ public class RoutingServiceTests extends ESTestCase {
         threadPool.shutdown();
     }
 
-    public void testRejectionUnlessStarted() {
-        final RoutingService routingService = new RoutingService(clusterService, (s, r) -> s);
-        final PlainActionFuture<Void> future = new PlainActionFuture<>();
-
-        if (randomBoolean()) {
-            routingService.start();
-            routingService.stop();
-        } else if (randomBoolean()) {
-            routingService.close();
-        }
-
-        routingService.reroute("test", future);
-        assertTrue(future.isDone());
-        assertThat(expectThrows(IllegalStateException.class, future::actionGet).getMessage(),
-            startsWith("rejecting delayed reroute [test] in state ["));
-    }
-
     public void testReroutesWhenRequested() throws InterruptedException {
         final AtomicLong rerouteCount = new AtomicLong();
-        final RoutingService routingService = new RoutingService(clusterService, (s, r) -> {
+        final BatchedRerouteService batchedRerouteService = new BatchedRerouteService(clusterService, (s, r) -> {
             rerouteCount.incrementAndGet();
             return s;
         });
 
-        routingService.start();
-
         long rerouteCountBeforeReroute = 0L;
         final int iterations = between(1, 100);
         final CountDownLatch countDownLatch = new CountDownLatch(iterations);
         for (int i = 0; i < iterations; i++) {
             rerouteCountBeforeReroute = Math.max(rerouteCountBeforeReroute, rerouteCount.get());
-            routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
+            batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
         }
         countDownLatch.await(10, TimeUnit.SECONDS);
         assertThat(rerouteCountBeforeReroute, lessThan(rerouteCount.get()));
@@ -116,17 +95,15 @@ public class RoutingServiceTests extends ESTestCase {
         cyclicBarrier.await(); // wait for master thread to be blocked
 
         final AtomicBoolean rerouteExecuted = new AtomicBoolean();
-        final RoutingService routingService = new RoutingService(clusterService, (s, r) -> {
+        final BatchedRerouteService batchedRerouteService = new BatchedRerouteService(clusterService, (s, r) -> {
             assertTrue(rerouteExecuted.compareAndSet(false, true)); // only called once
             return s;
         });
 
-        routingService.start();
-
         final int iterations = between(1, 100);
         final CountDownLatch countDownLatch = new CountDownLatch(iterations);
         for (int i = 0; i < iterations; i++) {
-            routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
+            batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
         }
 
         cyclicBarrier.await(); // allow master thread to continue;
@@ -136,18 +113,17 @@ public class RoutingServiceTests extends ESTestCase {
 
     public void testNotifiesOnFailure() throws InterruptedException {
 
-        final RoutingService routingService = new RoutingService(clusterService, (s, r) -> {
+        final BatchedRerouteService batchedRerouteService = new BatchedRerouteService(clusterService, (s, r) -> {
             if (rarely()) {
                 throw new ElasticsearchException("simulated");
             }
             return randomBoolean() ? s : ClusterState.builder(s).build();
         });
-        routingService.start();
 
         final int iterations = between(1, 100);
         final CountDownLatch countDownLatch = new CountDownLatch(iterations);
         for (int i = 0; i < iterations; i++) {
-            routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
+            batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
             if (rarely()) {
                 clusterService.getMasterService().setClusterStatePublisher(
                     randomBoolean()

+ 12 - 17
server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java

@@ -77,7 +77,10 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
         AtomicReference<Set<String>> indices = new AtomicReference<>();
         AtomicLong currentTime = new AtomicLong();
         DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState,
-            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) {
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> {
+            assertTrue(reroute.compareAndSet(false, true));
+            listener.onResponse(null);
+        }) {
             @Override
             protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
                 assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
@@ -85,11 +88,6 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
             }
         };
 
-        monitor.setRerouteAction((reason, listener) -> {
-            assertTrue(reroute.compareAndSet(false, true));
-            listener.onResponse(null);
-        });
-
         ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
         builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
         builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 30));
@@ -119,17 +117,16 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
         assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
 
         monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState,
-            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) {
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> {
+            assertTrue(reroute.compareAndSet(false, true));
+            listener.onResponse(null);
+        }) {
             @Override
             protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
                 assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
                 listener.onResponse(null);
             }
         };
-        monitor.setRerouteAction((reason, listener) -> {
-            assertTrue(reroute.compareAndSet(false, true));
-            listener.onResponse(null);
-        });
 
         indices.set(null);
         reroute.set(false);
@@ -147,18 +144,16 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
         AtomicLong currentTime = new AtomicLong();
         AtomicReference<ActionListener<Void>> listenerReference = new AtomicReference<>();
         DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState,
-            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) {
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> {
+            assertNotNull(listener);
+            assertTrue(listenerReference.compareAndSet(null, listener));
+        }) {
             @Override
             protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
                 throw new AssertionError("unexpected");
             }
         };
 
-        monitor.setRerouteAction((reason, listener) -> {
-            assertNotNull(listener);
-            assertTrue(listenerReference.compareAndSet(null, listener));
-        });
-
         final ImmutableOpenMap.Builder<String, DiskUsage> allDisksOkBuilder;
         allDisksOkBuilder = ImmutableOpenMap.builder();
         allDisksOkBuilder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 50));

+ 2 - 2
server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java

@@ -22,7 +22,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.coordination.Coordinator;
 import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.routing.RoutingService;
+import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.service.ClusterApplier;
 import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -92,7 +92,7 @@ public class DiscoveryModuleTests extends ESTestCase {
     private DiscoveryModule newModule(Settings settings, List<DiscoveryPlugin> plugins) {
         return new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, null, masterService,
             clusterApplier, clusterSettings, plugins, null, createTempDir().toAbsolutePath(), gatewayMetaState,
-            mock(RoutingService.class));
+            mock(RerouteService.class));
     }
 
     public void testDefaults() {

+ 3 - 3
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -100,7 +100,7 @@ import org.elasticsearch.cluster.metadata.MetaDataMappingService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.cluster.routing.RoutingService;
+import org.elasticsearch.cluster.routing.BatchedRerouteService;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
@@ -950,7 +950,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 transportService, indicesService, actionFilters, indexNameExpressionResolver);
             final ShardStateAction shardStateAction = new ShardStateAction(
                 clusterService, transportService, allocationService,
-                new RoutingService(clusterService, allocationService::reroute),
+                new BatchedRerouteService(clusterService, allocationService::reroute),
                 threadPool
             );
             final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService);
@@ -1134,7 +1134,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
                     .map(n -> n.node.getAddress()).collect(Collectors.toList()),
                 clusterService.getClusterApplierService(), Collections.emptyList(), random(),
-                new RoutingService(clusterService, allocationService::reroute)::reroute, ElectionStrategy.DEFAULT_INSTANCE);
+                new BatchedRerouteService(clusterService, allocationService::reroute), ElectionStrategy.DEFAULT_INSTANCE);
             masterService.setClusterStatePublisher(coordinator);
             coordinator.start();
             masterService.start();