Browse Source

Simplify delayed shard allocation

- moves calculation of the delay to a single place (ReplicaShardAllocator)
- reduces coupling between GatewayAllocator and RoutingService
- in master failover situations, elapsed delay time is forgotten

Closes #14808
Yannick Welsch 10 years ago
parent
commit
2084df825f

+ 2 - 2
core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

@@ -273,13 +273,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
         } catch (IndexNotFoundException e) {
             // one of the specified indices is not there - treat it as RED.
             ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState,
-                    numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), settings, clusterState),
+                    numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
                     pendingTaskTimeInQueue);
             response.setStatus(ClusterHealthStatus.RED);
             return response;
         }
 
         return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks,
-                numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), settings, clusterState), pendingTaskTimeInQueue);
+                numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), pendingTaskTimeInQueue);
     }
 }

+ 31 - 54
core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

@@ -31,7 +31,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.threadpool.ThreadPool;
 
-import java.util.Locale;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -56,9 +55,8 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
     private final AllocationService allocationService;
 
     private AtomicBoolean rerouting = new AtomicBoolean();
-    private volatile long registeredNextDelaySetting = Long.MAX_VALUE;
+    private volatile long minDelaySettingAtLastScheduling = Long.MAX_VALUE;
     private volatile ScheduledFuture registeredNextDelayFuture;
-    private volatile long unassignedShardsAllocatedTimestamp = 0;
 
     @Inject
     public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService) {
@@ -89,19 +87,6 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
         return this.allocationService;
     }
 
-    /**
-     * Update the last time the allocator tried to assign unassigned shards
-     *
-     * This is used so that both the GatewayAllocator and RoutingService use a
-     * consistent timestamp for comparing which shards have been delayed to
-     * avoid a race condition where GatewayAllocator thinks the shard should
-     * be delayed and the RoutingService thinks it has already passed the delay
-     * and that the GatewayAllocator has/will handle it.
-     */
-    public void setUnassignedShardsAllocatedTimestamp(long timeInMillis) {
-        this.unassignedShardsAllocatedTimestamp = timeInMillis;
-    }
-
     /**
      * Initiates a reroute.
      */
@@ -112,51 +97,43 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
     @Override
     public void clusterChanged(ClusterChangedEvent event) {
         if (event.state().nodes().localNodeMaster()) {
-            // figure out when the next unassigned allocation need to happen from now. If this is larger or equal
-            // then the last time we checked and scheduled, we are guaranteed to have a reroute until then, so no need
-            // to schedule again
-            long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(settings, event.state());
-            if (nextDelaySetting > 0 && nextDelaySetting < registeredNextDelaySetting) {
+            // Figure out if an existing scheduled reroute is good enough or whether we need to cancel and reschedule.
+            // If the minimum of the currently relevant delay settings is larger than something we scheduled in the past,
+            // we are guaranteed that the planned schedule will happen before any of the current shard delays are expired.
+            long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(settings, event.state());
+            if (minDelaySetting <= 0) {
+                logger.trace("no need to schedule reroute - no delayed unassigned shards, minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastScheduling);
+                minDelaySettingAtLastScheduling = Long.MAX_VALUE;
                 FutureUtils.cancel(registeredNextDelayFuture);
-                registeredNextDelaySetting = nextDelaySetting;
-                // We calculate nextDelay based on System.currentTimeMillis() here because we want the next delay from the "now" perspective
-                // rather than the delay from the last time the GatewayAllocator tried to assign/delay the shard.
-                // The actual calculation is based on the latter though, to account for shards that should have been allocated
-                // between unassignedShardsAllocatedTimestamp and System.currentTimeMillis()
-                long nextDelayBasedOnUnassignedShardsAllocatedTimestamp = UnassignedInfo.findNextDelayedAllocationIn(unassignedShardsAllocatedTimestamp, settings, event.state());
-                // adjust from unassignedShardsAllocatedTimestamp to now
-                long nextDelayMillis = nextDelayBasedOnUnassignedShardsAllocatedTimestamp - (System.currentTimeMillis() - unassignedShardsAllocatedTimestamp);
-                if (nextDelayMillis < 0) {
-                    nextDelayMillis = 0;
-                }
-                TimeValue nextDelay = TimeValue.timeValueMillis(nextDelayMillis);
-                int unassignedDelayedShards = UnassignedInfo.getNumberOfDelayedUnassigned(unassignedShardsAllocatedTimestamp, settings, event.state());
-                if (unassignedDelayedShards > 0) {
-                    logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]",
-                            unassignedDelayedShards, nextDelay);
-                    registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
-                        @Override
-                        protected void doRun() throws Exception {
-                            registeredNextDelaySetting = Long.MAX_VALUE;
-                            reroute("assign delayed unassigned shards");
-                        }
-
-                        @Override
-                        public void onFailure(Throwable t) {
-                            logger.warn("failed to schedule/execute reroute post unassigned shard", t);
-                            registeredNextDelaySetting = Long.MAX_VALUE;
-                        }
-                    });
-                }
+            } else if (minDelaySetting < minDelaySettingAtLastScheduling) {
+                FutureUtils.cancel(registeredNextDelayFuture);
+                minDelaySettingAtLastScheduling = minDelaySetting;
+                TimeValue nextDelay = TimeValue.timeValueNanos(UnassignedInfo.findNextDelayedAllocationIn(event.state()));
+                assert nextDelay.nanos() > 0 : "next delay must be non 0 as minDelaySetting is [" + minDelaySetting + "]";
+                logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]",
+                        UnassignedInfo.getNumberOfDelayedUnassigned(event.state()), nextDelay);
+                registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
+                    @Override
+                    protected void doRun() throws Exception {
+                        minDelaySettingAtLastScheduling = Long.MAX_VALUE;
+                        reroute("assign delayed unassigned shards");
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        logger.warn("failed to schedule/execute reroute post unassigned shard", t);
+                        minDelaySettingAtLastScheduling = Long.MAX_VALUE;
+                    }
+                });
             } else {
-                logger.trace("no need to schedule reroute due to delayed unassigned, next_delay_setting [{}], registered [{}]", nextDelaySetting, registeredNextDelaySetting);
+                logger.trace("no need to schedule reroute - current schedule reroute is enough. minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastScheduling);
             }
         }
     }
 
     // visible for testing
-    long getRegisteredNextDelaySetting() {
-        return this.registeredNextDelaySetting;
+    long getMinDelaySettingAtLastScheduling() {
+        return this.minDelaySettingAtLastScheduling;
     }
 
     // visible for testing

+ 63 - 39
core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java

@@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Holds additional information as to why the shard is in unassigned state.
@@ -103,21 +104,24 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
     }
 
     private final Reason reason;
-    private final long timestamp;
+    private final long unassignedTimeMillis; // used for display and log messages, in milliseconds
+    private final long unassignedTimeNanos; // in nanoseconds, used to calculate delay for delayed shard allocation
+    private long lastComputedLeftDelayNanos = 0l; // how long to delay shard allocation, not serialized (always positive, 0 means no delay)
     private final String message;
     private final Throwable failure;
 
     public UnassignedInfo(Reason reason, String message) {
-        this(reason, System.currentTimeMillis(), message, null);
+        this(reason, System.currentTimeMillis(), System.nanoTime(), message, null);
     }
 
     public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure) {
-        this(reason, System.currentTimeMillis(), message, failure);
+        this(reason, System.currentTimeMillis(), System.nanoTime(), message, failure);
     }
 
-    private UnassignedInfo(Reason reason, long timestamp, String message, Throwable failure) {
+    private UnassignedInfo(Reason reason, long unassignedTimeMillis, long timestampNanos, String message, Throwable failure) {
         this.reason = reason;
-        this.timestamp = timestamp;
+        this.unassignedTimeMillis = unassignedTimeMillis;
+        this.unassignedTimeNanos = timestampNanos;
         this.message = message;
         this.failure = failure;
         assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
@@ -125,14 +129,18 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
 
     UnassignedInfo(StreamInput in) throws IOException {
         this.reason = Reason.values()[(int) in.readByte()];
-        this.timestamp = in.readLong();
+        this.unassignedTimeMillis = in.readLong();
+        // As System.nanoTime() cannot be compared across different JVMs, reset it to now.
+        // This means that in master failover situations, elapsed delay time is forgotten.
+        this.unassignedTimeNanos = System.nanoTime();
         this.message = in.readOptionalString();
         this.failure = in.readThrowable();
     }
 
     public void writeTo(StreamOutput out) throws IOException {
         out.writeByte((byte) reason.ordinal());
-        out.writeLong(timestamp);
+        out.writeLong(unassignedTimeMillis);
+        // Do not serialize unassignedTimeNanos as System.nanoTime() cannot be compared across different JVMs
         out.writeOptionalString(message);
         out.writeThrowable(failure);
     }
@@ -149,13 +157,20 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
     }
 
     /**
-     * The timestamp in milliseconds since epoch. Note, we use timestamp here since
-     * we want to make sure its preserved across node serializations. Extra care need
-     * to be made if its used to calculate diff (handle negative values) in case of
-     * time drift.
+     * The timestamp in milliseconds when the shard became unassigned, based on System.currentTimeMillis().
+     * Note, we use timestamp here since we want to make sure its preserved across node serializations.
      */
-    public long getTimestampInMillis() {
-        return this.timestamp;
+    public long getUnassignedTimeInMillis() {
+        return this.unassignedTimeMillis;
+    }
+
+    /**
+     * The timestamp in nanoseconds when the shard became unassigned, based on System.nanoTime().
+     * Used to calculate the delay for delayed shard allocation.
+     * ONLY EXPOSED FOR TESTS!
+     */
+    public long getUnassignedTimeInNanos() {
+        return this.unassignedTimeNanos;
     }
 
     /**
@@ -186,7 +201,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
     }
 
     /**
-     * The allocation delay value associated with the index (defaulting to node settings if not set).
+     * The allocation delay value in milliseconds associated with the index (defaulting to node settings if not set).
      */
     public long getAllocationDelayTimeoutSetting(Settings settings, Settings indexSettings) {
         if (reason != Reason.NODE_LEFT) {
@@ -197,31 +212,40 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
     }
 
     /**
-     * The time in millisecond until this unassigned shard can be reassigned.
+     * The delay in nanoseconds until this unassigned shard can be reassigned. This value is cached and might be slightly out-of-date.
+     * See also the {@link #updateDelay(long, Settings, Settings)} method.
      */
-    public long getDelayAllocationExpirationIn(long unassignedShardsAllocatedTimestamp, Settings settings, Settings indexSettings) {
-        long delayTimeout = getAllocationDelayTimeoutSetting(settings, indexSettings);
-        if (delayTimeout == 0) {
-            return 0;
-        }
-        long delta = unassignedShardsAllocatedTimestamp - timestamp;
-        // account for time drift, treat it as no timeout
-        if (delta < 0) {
-            return 0;
-        }
-        return delayTimeout - delta;
+    public long getLastComputedLeftDelayNanos() {
+        return lastComputedLeftDelayNanos;
     }
 
+    /**
+     * Updates delay left based on current time (in nanoseconds) and index/node settings.
+     * Should only be called from ReplicaShardAllocator.
+     * @return updated delay in nanoseconds
+     */
+    public long updateDelay(long nanoTimeNow, Settings settings, Settings indexSettings) {
+        long delayTimeoutMillis = getAllocationDelayTimeoutSetting(settings, indexSettings);
+        final long newComputedLeftDelayNanos;
+        if (delayTimeoutMillis == 0l) {
+            newComputedLeftDelayNanos = 0l;
+        } else {
+            assert nanoTimeNow >= unassignedTimeNanos;
+            long delayTimeoutNanos = TimeUnit.NANOSECONDS.convert(delayTimeoutMillis, TimeUnit.MILLISECONDS);
+            newComputedLeftDelayNanos = Math.max(0l, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos));
+        }
+        lastComputedLeftDelayNanos = newComputedLeftDelayNanos;
+        return newComputedLeftDelayNanos;
+    }
 
     /**
      * Returns the number of shards that are unassigned and currently being delayed.
      */
-    public static int getNumberOfDelayedUnassigned(long unassignedShardsAllocatedTimestamp, Settings settings, ClusterState state) {
+    public static int getNumberOfDelayedUnassigned(ClusterState state) {
         int count = 0;
         for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
             if (shard.primary() == false) {
-                IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
-                long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(unassignedShardsAllocatedTimestamp, settings, indexMetaData.getSettings());
+                long delay = shard.unassignedInfo().getLastComputedLeftDelayNanos();
                 if (delay > 0) {
                     count++;
                 }
@@ -231,15 +255,16 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
     }
 
     /**
-     * Finds the smallest delay expiration setting of an unassigned shard. Returns 0 if there are none.
+     * Finds the smallest delay expiration setting in milliseconds of all unassigned shards that are still delayed. Returns 0 if there are none.
      */
     public static long findSmallestDelayedAllocationSetting(Settings settings, ClusterState state) {
         long nextDelaySetting = Long.MAX_VALUE;
         for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
             if (shard.primary() == false) {
                 IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
+                long leftDelayNanos = shard.unassignedInfo().getLastComputedLeftDelayNanos();
                 long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSetting(settings, indexMetaData.getSettings());
-                if (delayTimeoutSetting > 0 && delayTimeoutSetting < nextDelaySetting) {
+                if (leftDelayNanos > 0 && delayTimeoutSetting > 0 && delayTimeoutSetting < nextDelaySetting) {
                     nextDelaySetting = delayTimeoutSetting;
                 }
             }
@@ -249,14 +274,13 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
 
 
     /**
-     * Finds the next (closest) delay expiration of an unassigned shard. Returns 0 if there are none.
+     * Finds the next (closest) delay expiration of an unassigned shard in nanoseconds. Returns 0 if there are none.
      */
-    public static long findNextDelayedAllocationIn(long unassignedShardsAllocatedTimestamp, Settings settings, ClusterState state) {
+    public static long findNextDelayedAllocationIn(ClusterState state) {
         long nextDelay = Long.MAX_VALUE;
         for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
             if (shard.primary() == false) {
-                IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
-                long nextShardDelay = shard.unassignedInfo().getDelayAllocationExpirationIn(unassignedShardsAllocatedTimestamp, settings, indexMetaData.getSettings());
+                long nextShardDelay = shard.unassignedInfo().getLastComputedLeftDelayNanos();
                 if (nextShardDelay > 0 && nextShardDelay < nextDelay) {
                     nextDelay = nextShardDelay;
                 }
@@ -268,7 +292,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
     public String shortSummary() {
         StringBuilder sb = new StringBuilder();
         sb.append("[reason=").append(reason).append("]");
-        sb.append(", at[").append(DATE_TIME_FORMATTER.printer().print(timestamp)).append("]");
+        sb.append(", at[").append(DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis)).append("]");
         String details = getDetails();
         if (details != null) {
             sb.append(", details[").append(details).append("]");
@@ -285,7 +309,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject("unassigned_info");
         builder.field("reason", reason);
-        builder.field("at", DATE_TIME_FORMATTER.printer().print(timestamp));
+        builder.field("at", DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis));
         String details = getDetails();
         if (details != null) {
             builder.field("details", details);
@@ -301,7 +325,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
 
         UnassignedInfo that = (UnassignedInfo) o;
 
-        if (timestamp != that.timestamp) return false;
+        if (unassignedTimeMillis != that.unassignedTimeMillis) return false;
         if (reason != that.reason) return false;
         if (message != null ? !message.equals(that.message) : that.message != null) return false;
         return !(failure != null ? !failure.equals(that.failure) : that.failure != null);
@@ -311,7 +335,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
     @Override
     public int hashCode() {
         int result = reason != null ? reason.hashCode() : 0;
-        result = 31 * result + Long.hashCode(timestamp);
+        result = 31 * result + Long.hashCode(unassignedTimeMillis);
         result = 31 * result + (message != null ? message.hashCode() : 0);
         result = 31 * result + (failure != null ? failure.hashCode() : 0);
         return result;

+ 1 - 5
core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java

@@ -113,10 +113,6 @@ public class GatewayAllocator extends AbstractComponent {
     }
 
     public boolean allocateUnassigned(final RoutingAllocation allocation) {
-        // Take a snapshot of the current time and tell the RoutingService
-        // about it, so it will use a consistent timestamp for delays
-        long lastAllocateUnassignedRun = System.currentTimeMillis();
-        this.routingService.setUnassignedShardsAllocatedTimestamp(lastAllocateUnassignedRun);
         boolean changed = false;
 
         RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
@@ -124,7 +120,7 @@ public class GatewayAllocator extends AbstractComponent {
 
         changed |= primaryShardAllocator.allocateUnassigned(allocation);
         changed |= replicaShardAllocator.processExistingRecoveries(allocation);
-        changed |= replicaShardAllocator.allocateUnassigned(allocation, lastAllocateUnassignedRun);
+        changed |= replicaShardAllocator.allocateUnassigned(allocation);
         return changed;
     }
 

+ 33 - 20
core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java

@@ -111,10 +111,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
     }
 
     public boolean allocateUnassigned(RoutingAllocation allocation) {
-        return allocateUnassigned(allocation, System.currentTimeMillis());
-    }
-
-    public boolean allocateUnassigned(RoutingAllocation allocation, long allocateUnassignedTimestapm) {
+        long nanoTimeNow = System.nanoTime();
         boolean changed = false;
         final RoutingNodes routingNodes = allocation.routingNodes();
         final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
@@ -173,27 +170,43 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
                     unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), shard.version(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
                 }
             } else if (matchingNodes.hasAnyData() == false) {
-                // if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation
-                // of the replica shard needs to be delayed, and if so, add it to the ignore unassigned list
-                // note: we only care about replica in delayed allocation, since if we have an unassigned primary it
-                //       will anyhow wait to find an existing copy of the shard to be allocated
-                // note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService
-                IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex());
-                long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(allocateUnassignedTimestapm, settings, indexMetaData.getSettings());
-                if (delay > 0) {
-                    logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay));
-                    /**
-                     * mark it as changed, since we want to kick a publishing to schedule future allocation,
-                     * see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}).
-                     */
-                    changed = true;
-                    unassignedIterator.removeAndIgnore();
-                }
+                // if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed
+                changed |= ignoreUnassignedIfDelayed(nanoTimeNow, allocation, unassignedIterator, shard);
             }
         }
         return changed;
     }
 
+    /**
+     * Check if the allocation of the replica is to be delayed. Compute the delay and if it is delayed, add it to the ignore unassigned list
+     * Note: we only care about replica in delayed allocation, since if we have an unassigned primary it
+     *       will anyhow wait to find an existing copy of the shard to be allocated
+     * Note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService
+     *
+     * PUBLIC FOR TESTS!
+     *
+     * @param timeNowNanos Timestamp in nanoseconds representing "now"
+     * @param allocation the routing allocation
+     * @param unassignedIterator iterator over unassigned shards
+     * @param shard the shard which might be delayed
+     * @return true iff allocation is delayed for this shard
+     */
+    public boolean ignoreUnassignedIfDelayed(long timeNowNanos, RoutingAllocation allocation, RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) {
+        IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex());
+        // calculate delay and store it in UnassignedInfo to be used by RoutingService
+        long delay = shard.unassignedInfo().updateDelay(timeNowNanos, settings, indexMetaData.getSettings());
+        if (delay > 0) {
+            logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay));
+            /**
+             * mark it as changed, since we want to kick a publishing to schedule future allocation,
+             * see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}).
+             */
+            unassignedIterator.removeAndIgnore();
+            return true;
+        }
+        return false;
+    }
+
     /**
      * Can the shard be allocated on at least one node based on the allocation deciders.
      */

+ 2 - 2
core/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java

@@ -234,8 +234,8 @@ public class RestShardsAction extends AbstractCatAction {
 
             if (shard.unassignedInfo() != null) {
                 table.addCell(shard.unassignedInfo().getReason());
-                table.addCell(UnassignedInfo.DATE_TIME_FORMATTER.printer().print(shard.unassignedInfo().getTimestampInMillis()));
-                table.addCell(TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().getTimestampInMillis()));
+                table.addCell(UnassignedInfo.DATE_TIME_FORMATTER.printer().print(shard.unassignedInfo().getUnassignedTimeInMillis()));
+                table.addCell(TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().getUnassignedTimeInMillis()));
                 table.addCell(shard.unassignedInfo().getDetails());
             } else {
                 table.addCell(null);

+ 19 - 79
core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java

@@ -23,28 +23,18 @@ import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.EmptyClusterInfoService;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
-import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
-import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
-import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
-import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.gateway.GatewayAllocator;
-import org.elasticsearch.node.settings.NodeSettingsService;
 import org.elasticsearch.test.ESAllocationTestCase;
 import org.elasticsearch.test.cluster.TestClusterService;
-import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.After;
 import org.junit.Before;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -77,7 +67,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
     }
 
     public void testNoDelayedUnassigned() throws Exception {
-        AllocationService allocation = createAllocationService();
+        AllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
         MetaData metaData = MetaData.builder()
                 .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "0"))
                         .numberOfShards(1).numberOfReplicas(1))
@@ -98,15 +88,15 @@ public class RoutingServiceTests extends ESAllocationTestCase {
         clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
         ClusterState newState = clusterState;
 
-        assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
+        assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE));
         routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
-        assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
+        assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE));
         assertThat(routingService.hasReroutedAndClear(), equalTo(false));
     }
 
-    @TestLogging("_root:DEBUG")
     public void testDelayedUnassignedScheduleReroute() throws Exception {
-        AllocationService allocation = createAllocationService();
+        DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
+        AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
         MetaData metaData = MetaData.builder()
                 .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
                         .numberOfShards(1).numberOfReplicas(1))
@@ -131,24 +121,20 @@ public class RoutingServiceTests extends ESAllocationTestCase {
             }
         }
         assertNotNull(nodeId);
-        // remove node2 and reroute
 
+        // remove nodeId and reroute
         ClusterState prevState = clusterState;
         clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId)).build();
+        // make sure the replica is marked as delayed (i.e. not reallocated)
+        mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueMillis(randomIntBetween(0, 99)).nanos());
         clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
-        // We need to update the routing service's last attempted run to
-        // signal that the GatewayAllocator tried to allocated it but
-        // it was delayed
-        RoutingNodes.UnassignedShards unassigned = clusterState.getRoutingNodes().unassigned();
-        assertEquals(1, unassigned.size());
-        ShardRouting next = unassigned.iterator().next();
-        routingService.setUnassignedShardsAllocatedTimestamp(next.unassignedInfo().getTimestampInMillis() + randomIntBetween(0, 99));
+        assertEquals(1, clusterState.getRoutingNodes().unassigned().size());
 
         ClusterState newState = clusterState;
         routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
         assertBusy(() -> assertTrue("routing service should have run a reroute", routingService.hasReroutedAndClear()));
         // verify the registration has been reset
-        assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
+        assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE));
     }
 
     /**
@@ -159,10 +145,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
 
         try {
             DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
-            AllocationService allocation = new AllocationService(Settings.Builder.EMPTY_SETTINGS,
-                    randomAllocationDeciders(Settings.Builder.EMPTY_SETTINGS, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
-                    new ShardsAllocators(Settings.Builder.EMPTY_SETTINGS, mockGatewayAllocator), EmptyClusterInfoService.INSTANCE);
-
+            AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
             MetaData metaData = MetaData.builder()
                     .put(IndexMetaData.builder("short_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
                             .numberOfShards(1).numberOfReplicas(1))
@@ -206,7 +189,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
             ClusterState prevState = clusterState;
             clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(shortDelayReplica.currentNodeId()).remove(longDelayReplica.currentNodeId())).build();
             // make sure both replicas are marked as delayed (i.e. not reallocated)
-            mockGatewayAllocator.setShardsToDelay(Arrays.asList(shortDelayReplica, longDelayReplica));
+            mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1);
             clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
 
             // check that shortDelayReplica and longDelayReplica have been marked unassigned
@@ -232,10 +215,8 @@ public class RoutingServiceTests extends ESAllocationTestCase {
             // create routing service, also registers listener on cluster service
             RoutingService routingService = new RoutingService(Settings.EMPTY, testThreadPool, clusterService, allocation);
             routingService.start(); // just so performReroute does not prematurely return
-            // ensure routing service has proper timestamp before triggering
-            routingService.setUnassignedShardsAllocatedTimestamp(shortDelayUnassignedReplica.unassignedInfo().getTimestampInMillis() + randomIntBetween(0, 50));
-            // next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica
-            mockGatewayAllocator.setShardsToDelay(Arrays.asList(longDelayUnassignedReplica));
+            // next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica, simulate that we are now 1 second after shards became unassigned
+            mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueSeconds(1).nanos());
             // register listener on cluster state so we know when cluster state has been changed
             CountDownLatch latch = new CountDownLatch(1);
             clusterService.addLast(event -> latch.countDown());
@@ -244,14 +225,15 @@ public class RoutingServiceTests extends ESAllocationTestCase {
              // cluster service should have updated state and called routingService with clusterChanged
             latch.await();
             // verify the registration has been set to the delay of longDelayReplica/longDelayUnassignedReplica
-            assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(10000L));
+            assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(TimeValue.timeValueSeconds(10).millis()));
         } finally {
             terminate(testThreadPool);
         }
     }
 
     public void testDelayedUnassignedDoesNotRerouteForNegativeDelays() throws Exception {
-        AllocationService allocation = createAllocationService();
+        DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
+        AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
         MetaData metaData = MetaData.builder()
                 .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
                         .numberOfShards(1).numberOfReplicas(1))
@@ -271,7 +253,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
         clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
         clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
         // Set it in the future so the delay will be negative
-        routingService.setUnassignedShardsAllocatedTimestamp(System.currentTimeMillis() + TimeValue.timeValueMinutes(1).millis());
+        mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueMinutes(1).nanos());
 
         ClusterState newState = clusterState;
 
@@ -282,7 +264,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
                 assertThat(routingService.hasReroutedAndClear(), equalTo(false));
 
                 // verify the registration has been updated
-                assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(100L));
+                assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(100L));
             }
         });
     }
@@ -309,46 +291,4 @@ public class RoutingServiceTests extends ESAllocationTestCase {
             rerouted.set(true);
         }
     }
-
-    /**
-     * Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet.
-     * It does not implement the full logic but shards that are to be delayed need to be explicitly set using the method setShardsToDelay(...).
-     */
-    private static class DelayedShardsMockGatewayAllocator extends GatewayAllocator {
-        volatile List<ShardRouting> delayedShards = Collections.emptyList();
-
-        public DelayedShardsMockGatewayAllocator() {
-            super(Settings.EMPTY, null, null);
-        }
-
-        @Override
-        public void applyStartedShards(StartedRerouteAllocation allocation) {}
-
-        @Override
-        public void applyFailedShards(FailedRerouteAllocation allocation) {}
-
-        /**
-         * Explicitly set which shards should be delayed in the next allocateUnassigned calls
-         */
-        public void setShardsToDelay(List<ShardRouting> delayedShards) {
-            this.delayedShards = delayedShards;
-        }
-
-        @Override
-        public boolean allocateUnassigned(RoutingAllocation allocation) {
-            final RoutingNodes routingNodes = allocation.routingNodes();
-            final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
-            boolean changed = false;
-            while (unassignedIterator.hasNext()) {
-                ShardRouting shard = unassignedIterator.next();
-                for (ShardRouting shardToDelay : delayedShards) {
-                    if (shard.isSameShard(shardToDelay)) {
-                        changed = true;
-                        unassignedIterator.removeAndIgnore();
-                    }
-                }
-            }
-            return changed;
-        }
-    }
 }

+ 27 - 31
core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java

@@ -81,7 +81,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
 
         UnassignedInfo read = new UnassignedInfo(StreamInput.wrap(out.bytes()));
         assertThat(read.getReason(), equalTo(meta.getReason()));
-        assertThat(read.getTimestampInMillis(), equalTo(meta.getTimestampInMillis()));
+        assertThat(read.getUnassignedTimeInMillis(), equalTo(meta.getUnassignedTimeInMillis()));
         assertThat(read.getMessage(), equalTo(meta.getMessage()));
         assertThat(read.getDetails(), equalTo(meta.getDetails()));
     }
@@ -222,7 +222,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
         assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));
         assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo(), notNullValue());
         assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT));
-        assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getTimestampInMillis(), greaterThan(0l));
+        assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getUnassignedTimeInMillis(), greaterThan(0l));
     }
 
     /**
@@ -253,7 +253,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
         assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
         assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getMessage(), equalTo("test fail"));
         assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getDetails(), equalTo("test fail"));
-        assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getTimestampInMillis(), greaterThan(0l));
+        assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getUnassignedTimeInMillis(), greaterThan(0l));
     }
 
     /**
@@ -261,17 +261,11 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
      */
     public void testUnassignedDelayedOnlyOnNodeLeft() throws Exception {
         final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, null);
-        long delay = unassignedInfo.getAllocationDelayTimeoutSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
-        assertThat(delay, equalTo(TimeValue.timeValueHours(10).millis()));
-        assertBusy(new Runnable() {
-            @Override
-            public void run() {
-                long delay = unassignedInfo.getDelayAllocationExpirationIn(System.currentTimeMillis(),
-                        Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
-                assertThat(delay, greaterThan(0l));
-                assertThat(delay, lessThan(TimeValue.timeValueHours(10).millis()));
-            }
-        });
+        long delay = unassignedInfo.updateDelay(unassignedInfo.getUnassignedTimeInNanos() + 1, // add 1 tick delay
+                Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
+        long cachedDelay = unassignedInfo.getLastComputedLeftDelayNanos();
+        assertThat(delay, equalTo(cachedDelay));
+        assertThat(delay, equalTo(TimeValue.timeValueHours(10).nanos() - 1));
     }
 
     /**
@@ -281,15 +275,16 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
         EnumSet<UnassignedInfo.Reason> reasons = EnumSet.allOf(UnassignedInfo.Reason.class);
         reasons.remove(UnassignedInfo.Reason.NODE_LEFT);
         UnassignedInfo unassignedInfo = new UnassignedInfo(RandomPicks.randomFrom(getRandom(), reasons), null);
-        long delay = unassignedInfo.getAllocationDelayTimeoutSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
-        assertThat(delay, equalTo(0l));
-        delay = unassignedInfo.getDelayAllocationExpirationIn(System.currentTimeMillis(),
+        long delay = unassignedInfo.updateDelay(unassignedInfo.getUnassignedTimeInNanos() + 1, // add 1 tick delay
                 Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
         assertThat(delay, equalTo(0l));
+        delay = unassignedInfo.getLastComputedLeftDelayNanos();
+        assertThat(delay, equalTo(0l));
     }
 
     public void testNumberOfDelayedUnassigned() throws Exception {
-        AllocationService allocation = createAllocationService();
+        DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
+        AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
         MetaData metaData = MetaData.builder()
                 .put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
                 .put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
@@ -299,8 +294,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
                 .routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2")).build()).build();
         clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
         clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
-        assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(),
-                Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(0));
+        assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), equalTo(0));
         // starting primaries
         clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
         // starting replicas
@@ -308,24 +302,25 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
         assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
         // remove node2 and reroute
         clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
+        // make sure both replicas are marked as delayed (i.e. not reallocated)
+        mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1);
         clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
-        assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(),
-                Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(2));
+        assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), equalTo(2));
     }
 
     public void testFindNextDelayedAllocation() {
-        AllocationService allocation = createAllocationService();
+        DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
+        AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
         MetaData metaData = MetaData.builder()
-                .put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
-                .put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
+                .put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h")).numberOfShards(1).numberOfReplicas(1))
+                .put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h")).numberOfShards(1).numberOfReplicas(1))
                 .build();
         ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
                 .metaData(metaData)
                 .routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2")).build()).build();
         clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
         clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
-        assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(),
-                Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(0));
+        assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), equalTo(0));
         // starting primaries
         clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
         // starting replicas
@@ -333,14 +328,15 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
         assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
         // remove node2 and reroute
         clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
+        // make sure both replicas are marked as delayed (i.e. not reallocated)
+        mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1);
         clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
 
         long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState);
         assertThat(nextDelaySetting, equalTo(TimeValue.timeValueHours(10).millis()));
 
-        long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(System.currentTimeMillis(),
-                Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState);
-        assertThat(nextDelay, greaterThan(TimeValue.timeValueHours(9).millis()));
-        assertThat(nextDelay, lessThanOrEqualTo(TimeValue.timeValueHours(10).millis()));
+        long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(clusterState);
+        assertThat(nextDelay, greaterThan(TimeValue.timeValueHours(9).nanos()));
+        assertThat(nextDelay, lessThanOrEqualTo(TimeValue.timeValueHours(10).nanos()));
     }
 }

+ 3 - 1
docs/reference/index-modules/allocation/delayed.asciidoc

@@ -58,7 +58,9 @@ With delayed allocation enabled, the above scenario changes to look like this:
 
 NOTE: This setting will not affect the promotion of replicas to primaries, nor
 will it affect the assignment of replicas that have not been assigned
-previously.
+previously. In particular, delayed allocation does not come into effect after a full cluster restart.
+Also, in case of a master failover situation, elapsed delay time is forgotten
+(i.e. reset to the full initial delay).
 
 ==== Cancellation of shard relocation
 

+ 51 - 0
test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java

@@ -25,10 +25,13 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.EmptyClusterInfoService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.cluster.routing.RoutingNodes;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
 import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
 import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
@@ -36,7 +39,10 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.DummyTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.gateway.AsyncShardFetch;
 import org.elasticsearch.gateway.GatewayAllocator;
+import org.elasticsearch.gateway.ReplicaShardAllocator;
+import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
 import org.elasticsearch.node.settings.NodeSettingsService;
 import org.elasticsearch.test.gateway.NoopGatewayAllocator;
 
@@ -46,6 +52,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.function.Function;
 
 import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
 import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
@@ -179,4 +186,48 @@ public abstract class ESAllocationTestCase extends ESTestCase {
             return decision;
         }
     }
+
+    /**
+     * Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet.
+     * Also computes delay in UnassignedInfo based on customizable time source.
+     */
+    protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator {
+        private final ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator(Settings.EMPTY) {
+            @Override
+            protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation) {
+                return new AsyncShardFetch.FetchResult<>(shard.shardId(), null, Collections.<String>emptySet(), Collections.<String>emptySet());
+            }
+        };
+
+        private volatile Function<ShardRouting, Long> timeSource;
+
+        public DelayedShardsMockGatewayAllocator() {
+            super(Settings.EMPTY, null, null);
+        }
+
+        public void setTimeSource(Function<ShardRouting, Long> timeSource) {
+            this.timeSource = timeSource;
+        }
+
+        @Override
+        public void applyStartedShards(StartedRerouteAllocation allocation) {}
+
+        @Override
+        public void applyFailedShards(FailedRerouteAllocation allocation) {}
+
+        @Override
+        public boolean allocateUnassigned(RoutingAllocation allocation) {
+            final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator();
+            boolean changed = false;
+            while (unassignedIterator.hasNext()) {
+                ShardRouting shard = unassignedIterator.next();
+                if (shard.primary() || shard.allocatedPostIndexCreate() == false) {
+                    continue;
+                }
+                changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(timeSource == null ? System.nanoTime() : timeSource.apply(shard),
+                        allocation, unassignedIterator, shard);
+            }
+            return changed;
+        }
+    }
 }