Browse Source

Avoid parallel reroutes in DiskThresholdMonitor (#43381)

Today the `DiskThresholdMonitor` limits the frequency with which it submits
reroute tasks, but it might still submit these tasks faster than the master can
process them if, for instance, each reroute takes over 60 seconds. This causes
a problem since the reroute task runs with priority `IMMEDIATE` and is always
scheduled when there is a node over the high watermark, so this can starve any
other pending tasks on the master.

This change avoids further updates from the monitor while its last task(s) are
still in progress, and it measures the time of each update from the completion
time of the reroute task rather than its start time, to allow a larger window
for other tasks to run.

It also now makes use of the `RoutingService` to submit the reroute task, in
order to batch this task with any other pending reroutes. It enhances the
`RoutingService` to notify its listeners on completion.

Fixes #40174
Relates #42559
David Turner 6 years ago
parent
commit
448acea94b
18 changed files with 492 additions and 143 deletions
  1. 5 5
      server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java
  2. 3 1
      server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
  3. 3 3
      server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
  4. 1 2
      server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java
  5. 7 4
      server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java
  6. 58 21
      server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
  7. 121 68
      server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java
  8. 5 1
      server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java
  9. 5 4
      server/src/main/java/org/elasticsearch/node/Node.java
  10. 1 1
      server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java
  11. 2 2
      server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java
  12. 1 1
      server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java
  13. 170 0
      server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java
  14. 106 17
      server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java
  15. 0 9
      server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java
  16. 1 1
      server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java
  17. 2 2
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  18. 1 1
      test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

+ 5 - 5
server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

@@ -131,13 +131,13 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
             logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
         }
 
-        // Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
+        // Submit a job that will reschedule itself after running
         threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
 
         try {
             if (clusterService.state().getNodes().getDataNodes().size() > 1) {
                 // Submit an info update job to be run immediately
-                threadPool.executor(executorName()).execute(() -> maybeRefresh());
+                threadPool.executor(executorName()).execute(this::maybeRefresh);
             }
         } catch (EsRejectedExecutionException ex) {
             logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
@@ -173,7 +173,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
             if (logger.isDebugEnabled()) {
                 logger.debug("data node was added, retrieving new cluster info");
             }
-            threadPool.executor(executorName()).execute(() -> maybeRefresh());
+            threadPool.executor(executorName()).execute(this::maybeRefresh);
         }
 
         if (this.isMaster && event.nodesRemoved()) {
@@ -316,7 +316,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
                 ShardStats[] stats = indicesStatsResponse.getShards();
                 ImmutableOpenMap.Builder<String, Long> newShardSizes = ImmutableOpenMap.builder();
                 ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath = ImmutableOpenMap.builder();
-                buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath, clusterService.state());
+                buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath);
                 shardSizes = newShardSizes.build();
                 shardRoutingToDataPath = newShardRoutingToDataPath.build();
             }
@@ -365,7 +365,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
     }
 
     static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
-                                    ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) {
+                                    ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
         for (ShardStats s : stats) {
             newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
             long size = s.getStats().getStore().sizeInBytes();

+ 3 - 1
server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

@@ -379,7 +379,9 @@ public class ShardStateAction {
                 if (logger.isTraceEnabled()) {
                     logger.trace("{}, scheduling a reroute", reason);
                 }
-                routingService.reroute(reason);
+                routingService.reroute(reason, ActionListener.wrap(
+                    r -> logger.trace("{}, reroute completed", reason),
+                    e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
             }
         }
     }

+ 3 - 3
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -80,7 +80,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -146,13 +145,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
      * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
      * @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster.
      * @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In
-     *                production code this calls {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String)}.
+     *                production code this calls
+     *                {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String, ActionListener)}.
      */
     public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
                        NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
                        Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
                        ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
-                       Consumer<String> reroute, ElectionStrategy electionStrategy) {
+                       BiConsumer<String, ActionListener<Void>> reroute, ElectionStrategy electionStrategy) {
         this.settings = settings;
         this.transportService = transportService;
         this.masterService = masterService;

+ 1 - 2
server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

@@ -62,7 +62,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
@@ -92,7 +91,7 @@ public class JoinHelper {
     JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
                TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
                BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
-               Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, Consumer<String> reroute) {
+               Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, BiConsumer<String, ActionListener<Void>> reroute) {
         this.masterService = masterService;
         this.transportService = transportService;
         this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);

+ 7 - 4
server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java

@@ -20,6 +20,7 @@ package org.elasticsearch.cluster.coordination;
 
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.NotMasterException;
@@ -36,7 +37,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.BiConsumer;
-import java.util.function.Consumer;
 
 import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
 
@@ -45,7 +45,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
     private final AllocationService allocationService;
 
     private final Logger logger;
-    private final Consumer<String> reroute;
+    private final BiConsumer<String, ActionListener<Void>> reroute;
 
     public static class Task {
 
@@ -82,7 +82,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
         private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
     }
 
-    public JoinTaskExecutor(AllocationService allocationService, Logger logger, Consumer<String> reroute) {
+    public JoinTaskExecutor(AllocationService allocationService, Logger logger, BiConsumer<String, ActionListener<Void>> reroute) {
         this.allocationService = allocationService;
         this.logger = logger;
         this.reroute = reroute;
@@ -149,7 +149,10 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
             results.success(joinTask);
         }
         if (nodesChanged) {
-            reroute.accept("post-join reroute");
+            reroute.accept("post-join reroute", ActionListener.wrap(
+                r -> logger.trace("post-join reroute completed"),
+                e -> logger.debug("post-join reroute failed", e)));
+
             return results.build(allocationService.adaptAutoExpandReplicas(newState.nodes(nodesBuilder).build()));
         } else {
             // we must return a new cluster state instance to force publishing. This is important

+ 58 - 21
server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

@@ -22,16 +22,20 @@ package org.elasticsearch.cluster.routing;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.PlainListenableActionFuture;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
-import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.cluster.NotMasterException;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.inject.Inject;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
 
 /**
  * A {@link RoutingService} listens to clusters state. When this service
@@ -51,14 +55,16 @@ public class RoutingService extends AbstractLifecycleComponent {
     private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";
 
     private final ClusterService clusterService;
-    private final AllocationService allocationService;
+    private final BiFunction<ClusterState, String, ClusterState> reroute;
 
-    private AtomicBoolean rerouting = new AtomicBoolean();
+    private final Object mutex = new Object();
+    @Nullable // null if no reroute is currently pending
+    private PlainListenableActionFuture<Void> pendingRerouteListeners;
 
     @Inject
-    public RoutingService(ClusterService clusterService, AllocationService allocationService) {
+    public RoutingService(ClusterService clusterService, BiFunction<ClusterState, String, ClusterState> reroute) {
         this.clusterService = clusterService;
-        this.allocationService = allocationService;
+        this.reroute = reroute;
     }
 
     @Override
@@ -76,34 +82,55 @@ public class RoutingService extends AbstractLifecycleComponent {
     /**
      * Initiates a reroute.
      */
-    public final void reroute(String reason) {
-        try {
-            if (lifecycle.stopped()) {
-                return;
-            }
-            if (rerouting.compareAndSet(false, true) == false) {
-                logger.trace("already has pending reroute, ignoring {}", reason);
+    public final void reroute(String reason, ActionListener<Void> listener) {
+        if (lifecycle.started() == false) {
+            listener.onFailure(new IllegalStateException(
+                "rejecting delayed reroute [" + reason + "] in state [" + lifecycleState() + "]"));
+            return;
+        }
+        final PlainListenableActionFuture<Void> currentListeners;
+        synchronized (mutex) {
+            if (pendingRerouteListeners != null) {
+                logger.trace("already has pending reroute, adding [{}] to batch", reason);
+                pendingRerouteListeners.addListener(listener);
                 return;
             }
-            logger.trace("rerouting {}", reason);
+            currentListeners = PlainListenableActionFuture.newListenableFuture();
+            currentListeners.addListener(listener);
+            pendingRerouteListeners = currentListeners;
+        }
+        logger.trace("rerouting [{}]", reason);
+        try {
             clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",
                 new ClusterStateUpdateTask(Priority.HIGH) {
                     @Override
                     public ClusterState execute(ClusterState currentState) {
-                        rerouting.set(false);
-                        return allocationService.reroute(currentState, reason);
+                        synchronized (mutex) {
+                            assert pendingRerouteListeners == currentListeners;
+                            pendingRerouteListeners = null;
+                        }
+                        return reroute.apply(currentState, reason);
                     }
 
                     @Override
                     public void onNoLongerMaster(String source) {
-                        rerouting.set(false);
-                        // no biggie
+                        synchronized (mutex) {
+                            if (pendingRerouteListeners == currentListeners) {
+                                pendingRerouteListeners = null;
+                            }
+                        }
+                        currentListeners.onFailure(new NotMasterException("delayed reroute [" + reason + "] cancelled"));
+                        // no big deal, the new master will reroute again
                     }
 
                     @Override
                     public void onFailure(String source, Exception e) {
-                        rerouting.set(false);
-                        ClusterState state = clusterService.state();
+                        synchronized (mutex) {
+                            if (pendingRerouteListeners == currentListeners) {
+                                pendingRerouteListeners = null;
+                            }
+                        }
+                        final ClusterState state = clusterService.state();
                         if (logger.isTraceEnabled()) {
                             logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}",
                                 source, state), e);
@@ -111,12 +138,22 @@ public class RoutingService extends AbstractLifecycleComponent {
                             logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",
                                 source, state.version()), e);
                         }
+                        currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] failed", e));
+                    }
+
+                    @Override
+                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                        currentListeners.onResponse(null);
                     }
                 });
         } catch (Exception e) {
-            rerouting.set(false);
+            synchronized (mutex) {
+                assert pendingRerouteListeners == currentListeners;
+                pendingRerouteListeners = null;
+            }
             ClusterState state = clusterService.state();
             logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);
+            currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e));
         }
     }
 }

+ 121 - 68
server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

@@ -21,12 +21,20 @@ package org.elasticsearch.cluster.routing.allocation;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 import com.carrotsearch.hppc.ObjectLookupContainer;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterInfo;
 import org.elasticsearch.cluster.ClusterState;
@@ -54,11 +62,15 @@ public class DiskThresholdMonitor {
     private final Client client;
     private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
     private final Supplier<ClusterState> clusterStateSupplier;
-    private long lastRunNS;
+    private final LongSupplier currentTimeMillisSupplier;
+    private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE);
+    private final AtomicBoolean checkInProgress = new AtomicBoolean();
+    private final SetOnce<Consumer<ActionListener<Void>>> rerouteAction = new SetOnce<>();
 
     public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
-                                Client client) {
+                                Client client, LongSupplier currentTimeMillisSupplier) {
         this.clusterStateSupplier = clusterStateSupplier;
+        this.currentTimeMillisSupplier = currentTimeMillisSupplier;
         this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
         this.client = client;
     }
@@ -92,88 +104,129 @@ public class DiskThresholdMonitor {
         }
     }
 
+    private void checkFinished() {
+        final boolean checkFinished = checkInProgress.compareAndSet(true, false);
+        assert checkFinished;
+    }
 
     public void onNewInfo(ClusterInfo info) {
-        ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
-        if (usages != null) {
-            boolean reroute = false;
-            String explanation = "";
-
-            // Garbage collect nodes that have been removed from the cluster
-            // from the map that tracks watermark crossing
-            ObjectLookupContainer<String> nodes = usages.keys();
-            for (String node : nodeHasPassedWatermark) {
-                if (nodes.contains(node) == false) {
-                    nodeHasPassedWatermark.remove(node);
-                }
+
+        assert rerouteAction.get() != null;
+
+        if (checkInProgress.compareAndSet(false, true) == false) {
+            logger.info("skipping monitor as a check is already in progress");
+            return;
+        }
+
+        final ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
+        if (usages == null) {
+            checkFinished();
+            return;
+        }
+
+        boolean reroute = false;
+        String explanation = "";
+        final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
+
+        // Garbage collect nodes that have been removed from the cluster
+        // from the map that tracks watermark crossing
+        final ObjectLookupContainer<String> nodes = usages.keys();
+        for (String node : nodeHasPassedWatermark) {
+            if (nodes.contains(node) == false) {
+                nodeHasPassedWatermark.remove(node);
             }
-            ClusterState state = clusterStateSupplier.get();
-            Set<String> indicesToMarkReadOnly = new HashSet<>();
-            for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
-                String node = entry.key;
-                DiskUsage usage = entry.value;
-                warnAboutDiskIfNeeded(usage);
-                if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
-                    usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
-                    RoutingNode routingNode = state.getRoutingNodes().node(node);
-                    if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
-                        for (ShardRouting routing : routingNode) {
-                            indicesToMarkReadOnly.add(routing.index().getName());
-                        }
+        }
+        final ClusterState state = clusterStateSupplier.get();
+        final Set<String> indicesToMarkReadOnly = new HashSet<>();
+
+        for (final ObjectObjectCursor<String, DiskUsage> entry : usages) {
+            final String node = entry.key;
+            final DiskUsage usage = entry.value;
+            warnAboutDiskIfNeeded(usage);
+            if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
+                usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
+                final RoutingNode routingNode = state.getRoutingNodes().node(node);
+                if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
+                    for (ShardRouting routing : routingNode) {
+                        indicesToMarkReadOnly.add(routing.index().getName());
                     }
-                } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
-                    usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
-                    if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
-                        lastRunNS = System.nanoTime();
+                }
+            } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
+                usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
+                if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
+                    reroute = true;
+                    explanation = "high disk watermark exceeded on one or more nodes";
+                } else {
+                    logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " +
+                            "in the last [{}], skipping reroute",
+                        node, diskThresholdSettings.getRerouteInterval());
+                }
+                nodeHasPassedWatermark.add(node);
+            } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() ||
+                usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
+                nodeHasPassedWatermark.add(node);
+            } else {
+                if (nodeHasPassedWatermark.contains(node)) {
+                    // The node has previously been over the high or
+                    // low watermark, but is no longer, so we should
+                    // reroute so any unassigned shards can be allocated
+                    // if they are able to be
+                    if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
                         reroute = true;
-                        explanation = "high disk watermark exceeded on one or more nodes";
+                        explanation = "one or more nodes has gone under the high or low watermark";
+                        nodeHasPassedWatermark.remove(node);
                     } else {
-                        logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " +
+                        logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " +
                                 "in the last [{}], skipping reroute",
                             node, diskThresholdSettings.getRerouteInterval());
                     }
-                    nodeHasPassedWatermark.add(node);
-                } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() ||
-                    usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
-                    nodeHasPassedWatermark.add(node);
-                } else {
-                    if (nodeHasPassedWatermark.contains(node)) {
-                        // The node has previously been over the high or
-                        // low watermark, but is no longer, so we should
-                        // reroute so any unassigned shards can be allocated
-                        // if they are able to be
-                        if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
-                            lastRunNS = System.nanoTime();
-                            reroute = true;
-                            explanation = "one or more nodes has gone under the high or low watermark";
-                            nodeHasPassedWatermark.remove(node);
-                        } else {
-                            logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " +
-                                    "in the last [{}], skipping reroute",
-                                node, diskThresholdSettings.getRerouteInterval());
-                        }
-                    }
                 }
             }
-            if (reroute) {
-                logger.info("rerouting shards: [{}]", explanation);
-                reroute();
-            }
-            indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
-            if (indicesToMarkReadOnly.isEmpty() == false) {
-                markIndicesReadOnly(indicesToMarkReadOnly);
-            }
         }
+
+        final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 2);
+
+        if (reroute) {
+            logger.info("rerouting shards: [{}]", explanation);
+            rerouteAction.get().accept(ActionListener.wrap(r -> {
+                setLastRunTimeMillis();
+                listener.onResponse(r);
+            }, e -> {
+                logger.debug("reroute failed", e);
+                setLastRunTimeMillis();
+                listener.onFailure(e);
+            }));
+        } else {
+            listener.onResponse(null);
+        }
+
+        indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
+        if (indicesToMarkReadOnly.isEmpty() == false) {
+            markIndicesReadOnly(indicesToMarkReadOnly, ActionListener.wrap(r -> {
+                setLastRunTimeMillis();
+                listener.onResponse(r);
+            }, e -> {
+                logger.debug("marking indices readonly failed", e);
+                setLastRunTimeMillis();
+                listener.onFailure(e);
+            }));
+        } else {
+            listener.onResponse(null);
+        }
+    }
+
+    private void setLastRunTimeMillis() {
+        lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong()));
     }
 
-    protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
+    protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
         // set read-only block but don't block on the response
-        client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)).
-            setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute();
+        client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY))
+            .setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build())
+            .execute(ActionListener.map(listener, r -> null));
     }
 
-    protected void reroute() {
-        // Execute an empty reroute, but don't block on the response
-        client.admin().cluster().prepareReroute().execute();
+    public void setRerouteAction(BiConsumer<String, ActionListener<Void>> rerouteAction) {
+        this.rerouteAction.set(listener -> rerouteAction.accept("disk threshold monitor", listener));
     }
 }

+ 5 - 1
server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java

@@ -21,6 +21,8 @@ package org.elasticsearch.gateway;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.nodes.BaseNodeResponse;
 import org.elasticsearch.action.support.nodes.BaseNodesResponse;
 import org.elasticsearch.cluster.routing.RoutingNodes;
@@ -137,7 +139,9 @@ public class GatewayAllocator {
         @Override
         protected void reroute(ShardId shardId, String reason) {
             logger.trace("{} scheduling reroute for {}", shardId, reason);
-            routingService.reroute("async_shard_fetch");
+            routingService.reroute("async_shard_fetch", ActionListener.wrap(
+                r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason),
+                e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e)));
         }
     }
 

+ 5 - 4
server/src/main/java/org/elasticsearch/node/Node.java

@@ -369,10 +369,10 @@ public class Node implements Closeable {
                             .newHashPublisher());
             final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
                 scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
-            final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
-                clusterService.getClusterSettings(), client);
+            final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
+                clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis);
             final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
-                listener::onNewInfo);
+                diskThresholdMonitor::onNewInfo);
             final UsageService usageService = new UsageService();
 
             ModulesBuilder modules = new ModulesBuilder();
@@ -503,7 +503,7 @@ public class Node implements Closeable {
             RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
                 metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings());
 
-            final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService());
+            final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService()::reroute);
             final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry,
                 networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
                 clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
@@ -512,6 +512,7 @@ public class Node implements Closeable {
                 transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
                 httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
                 searchTransportService);
+            diskThresholdMonitor.setRerouteAction(routingService::reroute);
 
             final SearchService searchService = newSearchService(clusterService, indicesService,
                 threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java

@@ -119,7 +119,7 @@ public class DiskUsageTests extends ESTestCase {
         ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
         ImmutableOpenMap.Builder<ShardRouting, String> routingToPath = ImmutableOpenMap.builder();
         ClusterState state = ClusterState.builder(new ClusterName("blarg")).version(0).build();
-        InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath, state);
+        InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath);
         assertEquals(2, shardSizes.size());
         assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_0)));
         assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_1)));

+ 2 - 2
server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java

@@ -57,7 +57,7 @@ public class JoinHelperTests extends ESTestCase {
             x -> localNode, null, Collections.emptySet());
         JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
             (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
-            Collections.emptyList(), s -> {});
+            Collections.emptyList(), (s, r) -> {});
         transportService.start();
 
         DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
@@ -153,7 +153,7 @@ public class JoinHelperTests extends ESTestCase {
             x -> localNode, null, Collections.emptySet());
         new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState,
             (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
-            Collections.emptyList(), s -> {}); // registers request handler
+            Collections.emptyList(), (s, r) -> {}); // registers request handler
         transportService.start();
         transportService.acceptIncomingRequests();
 

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

@@ -174,7 +174,7 @@ public class NodeJoinTests extends ESTestCase {
             () -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
             new NoOpClusterApplier(),
             Collections.emptyList(),
-            random, s -> {}, ElectionStrategy.DEFAULT_INSTANCE);
+            random, (s, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE);
         transportService.start();
         transportService.acceptIncomingRequests();
         transport = capturingTransport;

+ 170 - 0
server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java

@@ -0,0 +1,170 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.routing;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.test.ClusterServiceUtils;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.startsWith;
+
+public class RoutingServiceTests extends ESTestCase {
+
+    private ThreadPool threadPool;
+    private ClusterService clusterService;
+
+    @Before
+    public void beforeTest() {
+        threadPool = new TestThreadPool("test");
+        clusterService = ClusterServiceUtils.createClusterService(threadPool);
+    }
+
+    @After
+    public void afterTest() {
+        clusterService.stop();
+        threadPool.shutdown();
+    }
+
+    public void testRejectionUnlessStarted() {
+        final RoutingService routingService = new RoutingService(clusterService, (s, r) -> s);
+        final PlainActionFuture<Void> future = new PlainActionFuture<>();
+
+        if (randomBoolean()) {
+            routingService.start();
+            routingService.stop();
+        } else if (randomBoolean()) {
+            routingService.close();
+        }
+
+        routingService.reroute("test", future);
+        assertTrue(future.isDone());
+        assertThat(expectThrows(IllegalStateException.class, future::actionGet).getMessage(),
+            startsWith("rejecting delayed reroute [test] in state ["));
+    }
+
+    public void testReroutesWhenRequested() throws InterruptedException {
+        final AtomicLong rerouteCount = new AtomicLong();
+        final RoutingService routingService = new RoutingService(clusterService, (s, r) -> {
+            rerouteCount.incrementAndGet();
+            return s;
+        });
+
+        routingService.start();
+
+        long rerouteCountBeforeReroute = 0L;
+        final int iterations = between(1, 100);
+        final CountDownLatch countDownLatch = new CountDownLatch(iterations);
+        for (int i = 0; i < iterations; i++) {
+            rerouteCountBeforeReroute = Math.max(rerouteCountBeforeReroute, rerouteCount.get());
+            routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
+        }
+        countDownLatch.await(10, TimeUnit.SECONDS);
+        assertThat(rerouteCountBeforeReroute, lessThan(rerouteCount.get()));
+    }
+
+    public void testBatchesReroutesTogether() throws BrokenBarrierException, InterruptedException {
+        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+        clusterService.submitStateUpdateTask("block master service", new ClusterStateUpdateTask() {
+            @Override
+            public ClusterState execute(ClusterState currentState) throws Exception {
+                cyclicBarrier.await(); // notify test that we are blocked
+                cyclicBarrier.await(); // wait to be unblocked by test
+                return currentState;
+            }
+
+            @Override
+            public void onFailure(String source, Exception e) {
+                throw new AssertionError(source, e);
+            }
+        });
+
+        cyclicBarrier.await(); // wait for master thread to be blocked
+
+        final AtomicBoolean rerouteExecuted = new AtomicBoolean();
+        final RoutingService routingService = new RoutingService(clusterService, (s, r) -> {
+            assertTrue(rerouteExecuted.compareAndSet(false, true)); // only called once
+            return s;
+        });
+
+        routingService.start();
+
+        final int iterations = between(1, 100);
+        final CountDownLatch countDownLatch = new CountDownLatch(iterations);
+        for (int i = 0; i < iterations; i++) {
+            routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
+        }
+
+        cyclicBarrier.await(); // allow master thread to continue;
+        countDownLatch.await(); // wait for reroute to complete
+        assertTrue(rerouteExecuted.get()); // see above for assertion that it's only called once
+    }
+
+    public void testNotifiesOnFailure() throws InterruptedException {
+
+        final RoutingService routingService = new RoutingService(clusterService, (s, r) -> {
+            if (rarely()) {
+                throw new ElasticsearchException("simulated");
+            }
+            return randomBoolean() ? s : ClusterState.builder(s).build();
+        });
+        routingService.start();
+
+        final int iterations = between(1, 100);
+        final CountDownLatch countDownLatch = new CountDownLatch(iterations);
+        for (int i = 0; i < iterations; i++) {
+            routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
+            if (rarely()) {
+                clusterService.getMasterService().setClusterStatePublisher(
+                    randomBoolean()
+                        ? ClusterServiceUtils.createClusterStatePublisher(clusterService.getClusterApplierService())
+                        : (event, publishListener, ackListener)
+                        -> publishListener.onFailure(new FailedToCommitClusterStateException("simulated")));
+            }
+
+            if (rarely()) {
+                clusterService.getClusterApplierService().onNewClusterState("simulated", () -> {
+                    ClusterState state = clusterService.state();
+                    return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes())
+                        .masterNodeId(randomBoolean() ? null : state.nodes().getLocalNodeId())).build();
+                }, (source, e) -> { });
+            }
+        }
+
+        assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); // i.e. it doesn't leak any listeners
+    }
+}

+ 106 - 17
server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java

@@ -19,6 +19,7 @@
 package org.elasticsearch.cluster.routing.allocation;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterInfo;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
@@ -35,16 +36,17 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
 
 public class DiskThresholdMonitorTests extends ESAllocationTestCase {
 
-
     public void testMarkFloodStageIndicesReadOnly() {
         AllocationService allocation = createAllocationService(Settings.builder()
             .put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
@@ -61,7 +63,6 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
             .addAsNew(metaData.index("test"))
             .addAsNew(metaData.index("test_1"))
             .addAsNew(metaData.index("test_2"))
-
             .build();
         ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
             .metaData(metaData).routingTable(routingTable).build();
@@ -74,18 +75,21 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
         ClusterState finalState = clusterState;
         AtomicBoolean reroute = new AtomicBoolean(false);
         AtomicReference<Set<String>> indices = new AtomicReference<>();
+        AtomicLong currentTime = new AtomicLong();
         DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState,
-            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) {
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) {
             @Override
-            protected void reroute() {
-                assertTrue(reroute.compareAndSet(false, true));
-            }
-
-            @Override
-            protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
+            protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
                 assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
+                listener.onResponse(null);
             }
         };
+
+        monitor.setRerouteAction((reason, listener) -> {
+            assertTrue(reroute.compareAndSet(false, true));
+            listener.onResponse(null);
+        });
+
         ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
         builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
         builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 30));
@@ -97,6 +101,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
         builder = ImmutableOpenMap.builder();
         builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
         builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5));
+        currentTime.addAndGet(randomLongBetween(60001, 120000));
         monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
         assertTrue(reroute.get());
         assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get());
@@ -114,17 +119,17 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
         assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
 
         monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState,
-            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) {
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) {
             @Override
-            protected void reroute() {
-                assertTrue(reroute.compareAndSet(false, true));
-            }
-
-            @Override
-            protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
+            protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
                 assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
+                listener.onResponse(null);
             }
         };
+        monitor.setRerouteAction((reason, listener) -> {
+            assertTrue(reroute.compareAndSet(false, true));
+            listener.onResponse(null);
+        });
 
         indices.set(null);
         reroute.set(false);
@@ -133,6 +138,90 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
         builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5));
         monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
         assertTrue(reroute.get());
-        assertEquals(new HashSet<>(Arrays.asList("test_1")), indices.get());
+        assertEquals(Collections.singleton("test_1"), indices.get());
+    }
+
+    public void testDoesNotSubmitRerouteTaskTooFrequently() {
+        final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
+            .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build();
+        AtomicLong currentTime = new AtomicLong();
+        AtomicReference<ActionListener<Void>> listenerReference = new AtomicReference<>();
+        DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState,
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) {
+            @Override
+            protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
+                throw new AssertionError("unexpected");
+            }
+        };
+
+        monitor.setRerouteAction((reason, listener) -> {
+            assertNotNull(listener);
+            assertTrue(listenerReference.compareAndSet(null, listener));
+        });
+
+        final ImmutableOpenMap.Builder<String, DiskUsage> allDisksOkBuilder;
+        allDisksOkBuilder = ImmutableOpenMap.builder();
+        allDisksOkBuilder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 50));
+        allDisksOkBuilder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 50));
+        final ImmutableOpenMap<String, DiskUsage> allDisksOk = allDisksOkBuilder.build();
+
+        final ImmutableOpenMap.Builder<String, DiskUsage> oneDiskAboveWatermarkBuilder = ImmutableOpenMap.builder();
+        oneDiskAboveWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 9)));
+        oneDiskAboveWatermarkBuilder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 50));
+        final ImmutableOpenMap<String, DiskUsage> oneDiskAboveWatermark = oneDiskAboveWatermarkBuilder.build();
+
+        // should not reroute when all disks are ok
+        currentTime.addAndGet(randomLongBetween(0, 120000));
+        monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
+        assertNull(listenerReference.get());
+
+        // should reroute when one disk goes over the watermark
+        currentTime.addAndGet(randomLongBetween(0, 120000));
+        monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null));
+        assertNotNull(listenerReference.get());
+        listenerReference.getAndSet(null).onResponse(null);
+
+        if (randomBoolean()) {
+            // should not re-route again within the reroute interval
+            currentTime.addAndGet(randomLongBetween(0,
+                DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis()));
+            monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
+            assertNull(listenerReference.get());
+        }
+
+        // should reroute again when one disk is still over the watermark
+        currentTime.addAndGet(randomLongBetween(
+            DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000));
+        monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null));
+        assertNotNull(listenerReference.get());
+        final ActionListener<Void> rerouteListener1 = listenerReference.getAndSet(null);
+
+        // should not re-route again before reroute has completed
+        currentTime.addAndGet(randomLongBetween(0, 120000));
+        monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
+        assertNull(listenerReference.get());
+
+        // complete reroute
+        rerouteListener1.onResponse(null);
+
+        if (randomBoolean()) {
+            // should not re-route again within the reroute interval
+            currentTime.addAndGet(randomLongBetween(0,
+                DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis()));
+            monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
+            assertNull(listenerReference.get());
+        }
+
+        // should reroute again after the reroute interval
+        currentTime.addAndGet(randomLongBetween(
+            DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000));
+        monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
+        assertNotNull(listenerReference.get());
+        listenerReference.getAndSet(null).onResponse(null);
+
+        // should not reroute again when it is not required
+        currentTime.addAndGet(randomLongBetween(0, 120000));
+        monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
+        assertNull(listenerReference.get());
     }
 }

+ 0 - 9
server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java

@@ -19,7 +19,6 @@
 
 package org.elasticsearch.cluster.routing.allocation.decider;
 
-import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
 import org.elasticsearch.cluster.ClusterInfo;
 import org.elasticsearch.cluster.ClusterInfoService;
 import org.elasticsearch.cluster.ClusterState;
@@ -31,7 +30,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
-import org.elasticsearch.test.junit.annotations.TestLogging;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -53,7 +51,6 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
         return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class);
     }
 
-    @TestLogging("org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.cluster.service:TRACE")
     public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
         List<String> nodes = internalCluster().startNodes(3);
 
@@ -105,12 +102,6 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
 
         assertBusy(() -> {
             final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
-            logger.info("--> {}", clusterState.routingTable());
-
-            final RecoveryResponse recoveryResponse = client().admin().indices()
-                .prepareRecoveries("test").setActiveOnly(true).setDetailed(true).get();
-            logger.info("--> recoveries: {}", recoveryResponse);
-
             final Map<String, Integer> nodesToShardCount = new HashMap<>();
             for (final RoutingNode node : clusterState.getRoutingNodes()) {
                 logger.info("--> node {} has {} shards",

+ 1 - 1
server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java

@@ -213,7 +213,7 @@ public class ClusterStateChanges {
             transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver);
 
         nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
-        joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, s -> {});
+        joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, (s, r) -> {});
     }
 
     public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {

+ 2 - 2
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -950,7 +950,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 transportService, indicesService, actionFilters, indexNameExpressionResolver);
             final ShardStateAction shardStateAction = new ShardStateAction(
                 clusterService, transportService, allocationService,
-                new RoutingService(clusterService, allocationService),
+                new RoutingService(clusterService, allocationService::reroute),
                 threadPool
             );
             final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService);
@@ -1134,7 +1134,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
                     .map(n -> n.node.getAddress()).collect(Collectors.toList()),
                 clusterService.getClusterApplierService(), Collections.emptyList(), random(),
-                new RoutingService(clusterService, allocationService)::reroute, ElectionStrategy.DEFAULT_INSTANCE);
+                new RoutingService(clusterService, allocationService::reroute)::reroute, ElectionStrategy.DEFAULT_INSTANCE);
             masterService.setClusterStatePublisher(coordinator);
             coordinator.start();
             masterService.start();

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -841,7 +841,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                 final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
                 coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
                     allocationService, masterService, this::getPersistedState,
-                    Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), s -> {},
+                    Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, r) -> {},
                     getElectionStrategy());
                 masterService.setClusterStatePublisher(coordinator);
                 final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,