Ver código fonte

Merge pull request #20018 from rjernst/split_disk_threshold

Internal: Split disk threshold monitoring from decider
Ryan Ernst 9 anos atrás
pai
commit
2ea50bc162

+ 3 - 3
core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

@@ -34,7 +34,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
+import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.component.AbstractComponent;
@@ -106,10 +106,10 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
         this.threadPool = threadPool;
         this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
         this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings);
-        this.enabled = DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
+        this.enabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
         clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, this::setFetchTimeout);
         clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency);
-        clusterSettings.addSettingsUpdateConsumer(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
+        clusterSettings.addSettingsUpdateConsumer(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
 
         // Add InternalClusterInfoService to listen for Master changes
         this.clusterService.add((LocalNodeMasterListener)this);

+ 2 - 2
core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

@@ -29,7 +29,7 @@ import org.elasticsearch.cluster.DiffableUtils;
 import org.elasticsearch.cluster.InternalClusterInfoService;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
-import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
+import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseFieldMatcher;
@@ -750,7 +750,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, Fr
                                     RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.getKey(),
                                     RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.getKey(),
                                     RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.getKey(),
-                                    DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(),
+                                    DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(),
                                     InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(),
                                     InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.getKey(),
                                     DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(),

+ 144 - 0
core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

@@ -0,0 +1,144 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing.allocation;
+
+import java.util.Set;
+
+import com.carrotsearch.hppc.ObjectLookupContainer;
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterInfo;
+import org.elasticsearch.cluster.ClusterInfoService;
+import org.elasticsearch.cluster.DiskUsage;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.set.Sets;
+
+/**
+ * Listens for a node to go over the high watermark and kicks off an empty
+ * reroute if it does. Also responsible for logging about nodes that have
+ * passed the disk watermarks
+ */
+public class DiskThresholdMonitor extends AbstractComponent implements ClusterInfoService.Listener {
+    private final DiskThresholdSettings diskThresholdSettings;
+    private final Client client;
+    private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
+
+    private long lastRunNS;
+
+    // TODO: remove injection when ClusterInfoService is not injected
+    @Inject
+    public DiskThresholdMonitor(Settings settings, ClusterSettings clusterSettings,
+                                ClusterInfoService infoService, Client client) {
+        super(settings);
+        this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
+        this.client = client;
+        infoService.addListener(this);
+    }
+
+    /**
+     * Warn about the given disk usage if the low or high watermark has been passed
+     */
+    private void warnAboutDiskIfNeeded(DiskUsage usage) {
+        // Check absolute disk values
+        if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().bytes()) {
+            logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
+                diskThresholdSettings.getFreeBytesThresholdHigh(), usage);
+        } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().bytes()) {
+            logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node",
+                diskThresholdSettings.getFreeBytesThresholdLow(), usage);
+        }
+
+        // Check percentage disk values
+        if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
+            logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
+                Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), usage);
+        } else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
+            logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node",
+                Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdLow(), "%"), usage);
+        }
+    }
+
+    @Override
+    public void onNewInfo(ClusterInfo info) {
+        ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
+        if (usages != null) {
+            boolean reroute = false;
+            String explanation = "";
+
+            // Garbage collect nodes that have been removed from the cluster
+            // from the map that tracks watermark crossing
+            ObjectLookupContainer<String> nodes = usages.keys();
+            for (String node : nodeHasPassedWatermark) {
+                if (nodes.contains(node) == false) {
+                    nodeHasPassedWatermark.remove(node);
+                }
+            }
+
+            for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
+                String node = entry.key;
+                DiskUsage usage = entry.value;
+                warnAboutDiskIfNeeded(usage);
+                if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().bytes() ||
+                    usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
+                    if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
+                        lastRunNS = System.nanoTime();
+                        reroute = true;
+                        explanation = "high disk watermark exceeded on one or more nodes";
+                    } else {
+                        logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " +
+                                "in the last [{}], skipping reroute",
+                            node, diskThresholdSettings.getRerouteInterval());
+                    }
+                    nodeHasPassedWatermark.add(node);
+                } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().bytes() ||
+                    usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
+                    nodeHasPassedWatermark.add(node);
+                } else {
+                    if (nodeHasPassedWatermark.contains(node)) {
+                        // The node has previously been over the high or
+                        // low watermark, but is no longer, so we should
+                        // reroute so any unassigned shards can be allocated
+                        // if they are able to be
+                        if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
+                            lastRunNS = System.nanoTime();
+                            reroute = true;
+                            explanation = "one or more nodes has gone under the high or low watermark";
+                            nodeHasPassedWatermark.remove(node);
+                        } else {
+                            logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " +
+                                    "in the last [{}], skipping reroute",
+                                node, diskThresholdSettings.getRerouteInterval());
+                        }
+                    }
+                }
+            }
+            if (reroute) {
+                logger.info("rerouting shards: [{}]", explanation);
+                // Execute an empty reroute, but don't block on the response
+                client.admin().cluster().prepareReroute().execute();
+            }
+        }
+    }
+}

+ 174 - 0
core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java

@@ -0,0 +1,174 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.RatioValue;
+import org.elasticsearch.common.unit.TimeValue;
+
+/**
+ * A container to keep settings for disk thresholds up to date with cluster setting changes.
+ */
+public class DiskThresholdSettings {
+    public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING =
+        Setting.boolSetting("cluster.routing.allocation.disk.threshold_enabled", true,
+            Setting.Property.Dynamic, Setting.Property.NodeScope);
+    public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING =
+        new Setting<>("cluster.routing.allocation.disk.watermark.low", "85%",
+            (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.low"),
+            Setting.Property.Dynamic, Setting.Property.NodeScope);
+    public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING =
+        new Setting<>("cluster.routing.allocation.disk.watermark.high", "90%",
+            (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.high"),
+            Setting.Property.Dynamic, Setting.Property.NodeScope);
+    public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING =
+        Setting.boolSetting("cluster.routing.allocation.disk.include_relocations", true,
+            Setting.Property.Dynamic, Setting.Property.NodeScope);;
+    public static final Setting<TimeValue> CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING =
+        Setting.positiveTimeSetting("cluster.routing.allocation.disk.reroute_interval", TimeValue.timeValueSeconds(60),
+            Setting.Property.Dynamic, Setting.Property.NodeScope);
+
+    private volatile Double freeDiskThresholdLow;
+    private volatile Double freeDiskThresholdHigh;
+    private volatile ByteSizeValue freeBytesThresholdLow;
+    private volatile ByteSizeValue freeBytesThresholdHigh;
+    private volatile boolean includeRelocations;
+    private volatile boolean enabled;
+    private volatile TimeValue rerouteInterval;
+
+    public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
+        final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
+        final String highWatermark = CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.get(settings);
+        setHighWatermark(highWatermark);
+        setLowWatermark(lowWatermark);
+        this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings);
+        this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings);
+        this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
+        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
+        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
+        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations);
+        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
+        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
+    }
+
+    private void setIncludeRelocations(boolean includeRelocations) {
+        this.includeRelocations = includeRelocations;
+    }
+
+    private void setRerouteInterval(TimeValue rerouteInterval) {
+        this.rerouteInterval = rerouteInterval;
+    }
+
+    private void setEnabled(boolean enabled) {
+        this.enabled = enabled;
+    }
+
+    private void setLowWatermark(String lowWatermark) {
+        // Watermark is expressed in terms of used data, but we need "free" data watermark
+        this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark);
+        this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark,
+            CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
+    }
+
+    private void setHighWatermark(String highWatermark) {
+        // Watermark is expressed in terms of used data, but we need "free" data watermark
+        this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
+        this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark,
+            CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
+    }
+
+    public Double getFreeDiskThresholdLow() {
+        return freeDiskThresholdLow;
+    }
+
+    public Double getFreeDiskThresholdHigh() {
+        return freeDiskThresholdHigh;
+    }
+
+    public ByteSizeValue getFreeBytesThresholdLow() {
+        return freeBytesThresholdLow;
+    }
+
+    public ByteSizeValue getFreeBytesThresholdHigh() {
+        return freeBytesThresholdHigh;
+    }
+
+    public boolean includeRelocations() {
+        return includeRelocations;
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    public TimeValue getRerouteInterval() {
+        return rerouteInterval;
+    }
+
+    /**
+     * Attempts to parse the watermark into a percentage, returning 100.0% if
+     * it cannot be parsed.
+     */
+    private double thresholdPercentageFromWatermark(String watermark) {
+        try {
+            return RatioValue.parseRatioValue(watermark).getAsPercent();
+        } catch (ElasticsearchParseException ex) {
+            // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two
+            // cases separately
+            return 100.0;
+        }
+    }
+
+    /**
+     * Attempts to parse the watermark into a {@link ByteSizeValue}, returning
+     * a ByteSizeValue of 0 bytes if the value cannot be parsed.
+     */
+    private ByteSizeValue thresholdBytesFromWatermark(String watermark, String settingName) {
+        try {
+            return ByteSizeValue.parseBytesSizeValue(watermark, settingName);
+        } catch (ElasticsearchParseException ex) {
+            // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two
+            // cases separately
+            return ByteSizeValue.parseBytesSizeValue("0b", settingName);
+        }
+    }
+
+    /**
+     * Checks if a watermark string is a valid percentage or byte size value,
+     * @return the watermark value given
+     */
+    private static String validWatermarkSetting(String watermark, String settingName) {
+        try {
+            RatioValue.parseRatioValue(watermark);
+        } catch (ElasticsearchParseException e) {
+            try {
+                ByteSizeValue.parseBytesSizeValue(watermark, settingName);
+            } catch (ElasticsearchParseException ex) {
+                ex.addSuppressed(e);
+                throw ex;
+            }
+        }
+        return watermark;
+    }
+}

+ 36 - 306
core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

@@ -19,37 +19,27 @@
 
 package org.elasticsearch.cluster.routing.allocation.decider;
 
-import com.carrotsearch.hppc.ObjectLookupContainer;
+import java.util.Set;
+
 import com.carrotsearch.hppc.cursors.ObjectCursor;
-import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
-import org.elasticsearch.ElasticsearchParseException;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterInfo;
-import org.elasticsearch.cluster.ClusterInfoService;
 import org.elasticsearch.cluster.DiskUsage;
-import org.elasticsearch.cluster.EmptyClusterInfoService;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.ClusterSettings;
-import org.elasticsearch.common.settings.Setting;
-import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.RatioValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
 
-import java.util.Set;
-
 /**
  * The {@link DiskThresholdDecider} checks that the node a shard is potentially
  * being allocated to has enough disk space.
@@ -77,226 +67,12 @@ public class DiskThresholdDecider extends AllocationDecider {
 
     public static final String NAME = "disk_threshold";
 
-    private volatile Double freeDiskThresholdLow;
-    private volatile Double freeDiskThresholdHigh;
-    private volatile ByteSizeValue freeBytesThresholdLow;
-    private volatile ByteSizeValue freeBytesThresholdHigh;
-    private volatile boolean includeRelocations;
-    private volatile boolean enabled;
-    private volatile TimeValue rerouteInterval;
-
-    public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING =
-        Setting.boolSetting("cluster.routing.allocation.disk.threshold_enabled", true, Property.Dynamic, Property.NodeScope);
-    public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING =
-        new Setting<>("cluster.routing.allocation.disk.watermark.low", "85%",
-            (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.low"),
-            Property.Dynamic, Property.NodeScope);
-    public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING =
-        new Setting<>("cluster.routing.allocation.disk.watermark.high", "90%",
-            (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.high"),
-            Property.Dynamic, Property.NodeScope);
-    public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING =
-        Setting.boolSetting("cluster.routing.allocation.disk.include_relocations", true,
-            Property.Dynamic, Property.NodeScope);;
-    public static final Setting<TimeValue> CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING =
-        Setting.positiveTimeSetting("cluster.routing.allocation.disk.reroute_interval", TimeValue.timeValueSeconds(60),
-            Property.Dynamic, Property.NodeScope);
-
-    /**
-     * Listens for a node to go over the high watermark and kicks off an empty
-     * reroute if it does. Also responsible for logging about nodes that have
-     * passed the disk watermarks
-     */
-    class DiskListener implements ClusterInfoService.Listener {
-        private final Client client;
-        private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
-
-        private long lastRunNS;
-
-        DiskListener(Client client) {
-            this.client = client;
-        }
-
-        /**
-         * Warn about the given disk usage if the low or high watermark has been passed
-         */
-        private void warnAboutDiskIfNeeded(DiskUsage usage) {
-            // Check absolute disk values
-            if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes()) {
-                logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
-                        DiskThresholdDecider.this.freeBytesThresholdHigh, usage);
-            } else if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdLow.bytes()) {
-                logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node",
-                        DiskThresholdDecider.this.freeBytesThresholdLow, usage);
-            }
-
-            // Check percentage disk values
-            if (usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdHigh) {
-                logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
-                        Strings.format1Decimals(100.0 - DiskThresholdDecider.this.freeDiskThresholdHigh, "%"), usage);
-            } else if (usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdLow) {
-                logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node",
-                        Strings.format1Decimals(100.0 - DiskThresholdDecider.this.freeDiskThresholdLow, "%"), usage);
-            }
-        }
-
-        @Override
-        public void onNewInfo(ClusterInfo info) {
-            ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
-            if (usages != null) {
-                boolean reroute = false;
-                String explanation = "";
-
-                // Garbage collect nodes that have been removed from the cluster
-                // from the map that tracks watermark crossing
-                ObjectLookupContainer<String> nodes = usages.keys();
-                for (String node : nodeHasPassedWatermark) {
-                    if (nodes.contains(node) == false) {
-                        nodeHasPassedWatermark.remove(node);
-                    }
-                }
-
-                for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
-                    String node = entry.key;
-                    DiskUsage usage = entry.value;
-                    warnAboutDiskIfNeeded(usage);
-                    if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes() ||
-                            usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdHigh) {
-                        if ((System.nanoTime() - lastRunNS) > DiskThresholdDecider.this.rerouteInterval.nanos()) {
-                            lastRunNS = System.nanoTime();
-                            reroute = true;
-                            explanation = "high disk watermark exceeded on one or more nodes";
-                        } else {
-                            logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " +
-                                            "in the last [{}], skipping reroute",
-                                    node, DiskThresholdDecider.this.rerouteInterval);
-                        }
-                        nodeHasPassedWatermark.add(node);
-                    } else if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdLow.bytes() ||
-                            usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdLow) {
-                        nodeHasPassedWatermark.add(node);
-                    } else {
-                        if (nodeHasPassedWatermark.contains(node)) {
-                            // The node has previously been over the high or
-                            // low watermark, but is no longer, so we should
-                            // reroute so any unassigned shards can be allocated
-                            // if they are able to be
-                            if ((System.nanoTime() - lastRunNS) > DiskThresholdDecider.this.rerouteInterval.nanos()) {
-                                lastRunNS = System.nanoTime();
-                                reroute = true;
-                                explanation = "one or more nodes has gone under the high or low watermark";
-                                nodeHasPassedWatermark.remove(node);
-                            } else {
-                                logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " +
-                                                "in the last [{}], skipping reroute",
-                                        node, DiskThresholdDecider.this.rerouteInterval);
-                            }
-                        }
-                    }
-                }
-                if (reroute) {
-                    logger.info("rerouting shards: [{}]", explanation);
-                    // Execute an empty reroute, but don't block on the response
-                    client.admin().cluster().prepareReroute().execute();
-                }
-            }
-        }
-    }
-
-    public DiskThresholdDecider(Settings settings) {
-        // It's okay the Client is null here, because the empty cluster info
-        // service will never actually call the listener where the client is
-        // needed. Also this constructor is only used for tests
-        this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), EmptyClusterInfoService.INSTANCE, null);
-    }
+    private final DiskThresholdSettings diskThresholdSettings;
 
     @Inject
-    public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings, ClusterInfoService infoService, Client client) {
+    public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) {
         super(settings);
-        final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
-        final String highWatermark = CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.get(settings);
-        setHighWatermark(highWatermark);
-        setLowWatermark(lowWatermark);
-        this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings);
-        this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings);
-        this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
-        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
-        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
-        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations);
-        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
-        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
-        infoService.addListener(new DiskListener(client));
-    }
-
-    private void setIncludeRelocations(boolean includeRelocations) {
-        this.includeRelocations = includeRelocations;
-    }
-
-    private void setRerouteInterval(TimeValue rerouteInterval) {
-        this.rerouteInterval = rerouteInterval;
-    }
-
-    private void setEnabled(boolean enabled) {
-        this.enabled = enabled;
-    }
-
-    private void setLowWatermark(String lowWatermark) {
-        // Watermark is expressed in terms of used data, but we need "free" data watermark
-        this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark);
-        this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark,
-                CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
-    }
-
-    private void setHighWatermark(String highWatermark) {
-        // Watermark is expressed in terms of used data, but we need "free" data watermark
-        this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
-        this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark,
-                CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
-    }
-
-    // For Testing
-    public Double getFreeDiskThresholdLow() {
-        return freeDiskThresholdLow;
-    }
-
-    // For Testing
-    public Double getFreeDiskThresholdHigh() {
-        return freeDiskThresholdHigh;
-    }
-
-    // For Testing
-    public Double getUsedDiskThresholdLow() {
-        return 100.0 - freeDiskThresholdLow;
-    }
-
-    // For Testing
-    public Double getUsedDiskThresholdHigh() {
-        return 100.0 - freeDiskThresholdHigh;
-    }
-
-    // For Testing
-    public ByteSizeValue getFreeBytesThresholdLow() {
-        return freeBytesThresholdLow;
-    }
-
-    // For Testing
-    public ByteSizeValue getFreeBytesThresholdHigh() {
-        return freeBytesThresholdHigh;
-    }
-
-    // For Testing
-    public boolean isIncludeRelocations() {
-        return includeRelocations;
-    }
-
-    // For Testing
-    public boolean isEnabled() {
-        return enabled;
-    }
-
-    // For Testing
-    public TimeValue getRerouteInterval() {
-        return rerouteInterval;
+        this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
     }
 
     /**
@@ -306,7 +82,7 @@ public class DiskThresholdDecider extends AllocationDecider {
      * If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
      * of all shards
      */
-    public static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation,
+    static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation,
                                               boolean subtractShardsMovingAway, String dataPath) {
         ClusterInfo clusterInfo = allocation.clusterInfo();
         long totalSize = 0;
@@ -333,8 +109,8 @@ public class DiskThresholdDecider extends AllocationDecider {
             return decision;
         }
 
-        final double usedDiskThresholdLow = 100.0 - DiskThresholdDecider.this.freeDiskThresholdLow;
-        final double usedDiskThresholdHigh = 100.0 - DiskThresholdDecider.this.freeDiskThresholdHigh;
+        final double usedDiskThresholdLow = 100.0 - diskThresholdSettings.getFreeDiskThresholdLow();
+        final double usedDiskThresholdHigh = 100.0 - diskThresholdSettings.getFreeDiskThresholdHigh();
 
         DiskUsage usage = getDiskUsage(node, allocation, usages);
         // First, check that the node currently over the low watermark
@@ -351,23 +127,23 @@ public class DiskThresholdDecider extends AllocationDecider {
         boolean primaryHasBeenAllocated = shardRouting.primary() && shardRouting.allocatedPostIndexCreate(indexMetaData);
 
         // checks for exact byte comparisons
-        if (freeBytes < freeBytesThresholdLow.bytes()) {
+        if (freeBytes < diskThresholdSettings.getFreeBytesThresholdLow().bytes()) {
             // If the shard is a replica or has a primary that has already been allocated before, check the low threshold
             if (!shardRouting.primary() || (shardRouting.primary() && primaryHasBeenAllocated)) {
                 if (logger.isDebugEnabled()) {
                     logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, preventing allocation",
-                            freeBytesThresholdLow, freeBytes, node.nodeId());
+                            diskThresholdSettings.getFreeBytesThresholdLow(), freeBytes, node.nodeId());
                 }
                 return allocation.decision(Decision.NO, NAME,
                         "the node is above the low watermark and has less than required [%s] free, free: [%s]",
-                        freeBytesThresholdLow, new ByteSizeValue(freeBytes));
-            } else if (freeBytes > freeBytesThresholdHigh.bytes()) {
+                        diskThresholdSettings.getFreeBytesThresholdLow(), new ByteSizeValue(freeBytes));
+            } else if (freeBytes > diskThresholdSettings.getFreeBytesThresholdHigh().bytes()) {
                 // Allow the shard to be allocated because it is primary that
                 // has never been allocated if it's under the high watermark
                 if (logger.isDebugEnabled()) {
                     logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, " +
                                     "but allowing allocation because primary has never been allocated",
-                            freeBytesThresholdLow, freeBytes, node.nodeId());
+                            diskThresholdSettings.getFreeBytesThresholdLow(), freeBytes, node.nodeId());
                 }
                 return allocation.decision(Decision.YES, NAME,
                         "the node is above the low watermark, but this primary shard has never been allocated before");
@@ -377,17 +153,17 @@ public class DiskThresholdDecider extends AllocationDecider {
                 if (logger.isDebugEnabled()) {
                     logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, " +
                                     "preventing allocation even though primary has never been allocated",
-                            freeBytesThresholdHigh, freeBytes, node.nodeId());
+                            diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytes, node.nodeId());
                 }
                 return allocation.decision(Decision.NO, NAME,
                         "the node is above the high watermark even though this shard has never been allocated " +
                                 "and has less than required [%s] free on node, free: [%s]",
-                        freeBytesThresholdHigh, new ByteSizeValue(freeBytes));
+                        diskThresholdSettings.getFreeBytesThresholdHigh(), new ByteSizeValue(freeBytes));
             }
         }
 
         // checks for percentage comparisons
-        if (freeDiskPercentage < freeDiskThresholdLow) {
+        if (freeDiskPercentage < diskThresholdSettings.getFreeDiskThresholdLow()) {
             // If the shard is a replica or has a primary that has already been allocated before, check the low threshold
             if (!shardRouting.primary() || (shardRouting.primary() && primaryHasBeenAllocated)) {
                 if (logger.isDebugEnabled()) {
@@ -398,7 +174,7 @@ public class DiskThresholdDecider extends AllocationDecider {
                 return allocation.decision(Decision.NO, NAME,
                         "the node is above the low watermark and has more than allowed [%s%%] used disk, free: [%s%%]",
                         usedDiskThresholdLow, freeDiskPercentage);
-            } else if (freeDiskPercentage > freeDiskThresholdHigh) {
+            } else if (freeDiskPercentage > diskThresholdSettings.getFreeDiskThresholdHigh()) {
                 // Allow the shard to be allocated because it is primary that
                 // has never been allocated if it's under the high watermark
                 if (logger.isDebugEnabled()) {
@@ -415,7 +191,7 @@ public class DiskThresholdDecider extends AllocationDecider {
                 if (logger.isDebugEnabled()) {
                     logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, " +
                                     "preventing allocation even though primary has never been allocated",
-                            Strings.format1Decimals(freeDiskThresholdHigh, "%"),
+                            Strings.format1Decimals(diskThresholdSettings.getFreeDiskThresholdHigh(), "%"),
                             Strings.format1Decimals(freeDiskPercentage, "%"), node.nodeId());
                 }
                 return allocation.decision(Decision.NO, NAME,
@@ -429,19 +205,20 @@ public class DiskThresholdDecider extends AllocationDecider {
         final long shardSize = getExpectedShardSize(shardRouting, allocation, 0);
         double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
         long freeBytesAfterShard = freeBytes - shardSize;
-        if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) {
+        if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().bytes()) {
             logger.warn("after allocating, node [{}] would have less than the required " +
                     "{} free bytes threshold ({} bytes free), preventing allocation",
-                    node.nodeId(), freeBytesThresholdHigh, freeBytesAfterShard);
+                    node.nodeId(), diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytesAfterShard);
             return allocation.decision(Decision.NO, NAME,
                     "after allocating the shard to this node, it would be above the high watermark " +
                             "and have less than required [%s] free, free: [%s]",
-                    freeBytesThresholdLow, new ByteSizeValue(freeBytesAfterShard));
+                    diskThresholdSettings.getFreeBytesThresholdLow(), new ByteSizeValue(freeBytesAfterShard));
         }
-        if (freeSpaceAfterShard < freeDiskThresholdHigh) {
+        if (freeSpaceAfterShard < diskThresholdSettings.getFreeDiskThresholdHigh()) {
             logger.warn("after allocating, node [{}] would have more than the allowed " +
                             "{} free disk threshold ({} free), preventing allocation",
-                    node.nodeId(), Strings.format1Decimals(freeDiskThresholdHigh, "%"), Strings.format1Decimals(freeSpaceAfterShard, "%"));
+                    node.nodeId(), Strings.format1Decimals(diskThresholdSettings.getFreeDiskThresholdHigh(), "%"),
+                                                           Strings.format1Decimals(freeSpaceAfterShard, "%"));
             return allocation.decision(Decision.NO, NAME,
                     "after allocating the shard to this node, it would be above the high watermark " +
                             "and have more than allowed [%s%%] used disk, free: [%s%%]",
@@ -479,25 +256,25 @@ public class DiskThresholdDecider extends AllocationDecider {
             return allocation.decision(Decision.YES, NAME,
                     "this shard is not allocated on the most utilized disk and can remain");
         }
-        if (freeBytes < freeBytesThresholdHigh.bytes()) {
+        if (freeBytes < diskThresholdSettings.getFreeBytesThresholdHigh().bytes()) {
             if (logger.isDebugEnabled()) {
                 logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain",
-                        freeBytesThresholdHigh, freeBytes, node.nodeId());
+                        diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytes, node.nodeId());
             }
             return allocation.decision(Decision.NO, NAME,
                     "after allocating this shard this node would be above the high watermark " +
                             "and there would be less than required [%s] free on node, free: [%s]",
-                    freeBytesThresholdHigh, new ByteSizeValue(freeBytes));
+                    diskThresholdSettings.getFreeBytesThresholdHigh(), new ByteSizeValue(freeBytes));
         }
-        if (freeDiskPercentage < freeDiskThresholdHigh) {
+        if (freeDiskPercentage < diskThresholdSettings.getFreeDiskThresholdHigh()) {
             if (logger.isDebugEnabled()) {
                 logger.debug("less than the required {}% free disk threshold ({}% free) on node {}, shard cannot remain",
-                        freeDiskThresholdHigh, freeDiskPercentage, node.nodeId());
+                        diskThresholdSettings.getFreeDiskThresholdHigh(), freeDiskPercentage, node.nodeId());
             }
             return allocation.decision(Decision.NO, NAME,
                     "after allocating this shard this node would be above the high watermark " +
                             "and there would be less than required [%s%%] free disk on node, free: [%s%%]",
-                    freeDiskThresholdHigh, freeDiskPercentage);
+                    diskThresholdSettings.getFreeDiskThresholdHigh(), freeDiskPercentage);
         }
 
         return allocation.decision(Decision.YES, NAME,
@@ -516,7 +293,7 @@ public class DiskThresholdDecider extends AllocationDecider {
             }
         }
 
-        if (includeRelocations) {
+        if (diskThresholdSettings.includeRelocations()) {
             long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, true, usage.getPath());
             DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(),
                     usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
@@ -536,7 +313,7 @@ public class DiskThresholdDecider extends AllocationDecider {
      * @param usages Map of nodeId to DiskUsage for all known nodes
      * @return DiskUsage representing given node using the average disk usage
      */
-    public DiskUsage averageUsage(RoutingNode node, ImmutableOpenMap<String, DiskUsage> usages) {
+    DiskUsage averageUsage(RoutingNode node, ImmutableOpenMap<String, DiskUsage> usages) {
         if (usages.size() == 0) {
             return new DiskUsage(node.nodeId(), node.node().getName(), "_na_", 0, 0);
         }
@@ -556,63 +333,16 @@ public class DiskThresholdDecider extends AllocationDecider {
      * @param shardSize Size in bytes of the shard
      * @return Percentage of free space after the shard is assigned to the node
      */
-    public double freeDiskPercentageAfterShardAssigned(DiskUsage usage, Long shardSize) {
+    double freeDiskPercentageAfterShardAssigned(DiskUsage usage, Long shardSize) {
         shardSize = (shardSize == null) ? 0 : shardSize;
         DiskUsage newUsage = new DiskUsage(usage.getNodeId(), usage.getNodeName(), usage.getPath(),
                 usage.getTotalBytes(),  usage.getFreeBytes() - shardSize);
         return newUsage.getFreeDiskAsPercentage();
     }
 
-    /**
-     * Attempts to parse the watermark into a percentage, returning 100.0% if
-     * it cannot be parsed.
-     */
-    public double thresholdPercentageFromWatermark(String watermark) {
-        try {
-            return RatioValue.parseRatioValue(watermark).getAsPercent();
-        } catch (ElasticsearchParseException ex) {
-            // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two
-            // cases separately
-            return 100.0;
-        }
-    }
-
-    /**
-     * Attempts to parse the watermark into a {@link ByteSizeValue}, returning
-     * a ByteSizeValue of 0 bytes if the value cannot be parsed.
-     */
-    public ByteSizeValue thresholdBytesFromWatermark(String watermark, String settingName) {
-        try {
-            return ByteSizeValue.parseBytesSizeValue(watermark, settingName);
-        } catch (ElasticsearchParseException ex) {
-            // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two
-            // cases separately
-            return ByteSizeValue.parseBytesSizeValue("0b", settingName);
-        }
-    }
-
-    /**
-     * Checks if a watermark string is a valid percentage or byte size value,
-     * @return the watermark value given
-     */
-    public static String validWatermarkSetting(String watermark, String settingName) {
-        try {
-            RatioValue.parseRatioValue(watermark);
-        } catch (ElasticsearchParseException e) {
-            try {
-                ByteSizeValue.parseBytesSizeValue(watermark, settingName);
-            } catch (ElasticsearchParseException ex) {
-                ex.addSuppressed(e);
-                throw ex;
-            }
-        }
-        return watermark;
-
-    }
-
     private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) {
         // Always allow allocation if the decider is disabled
-        if (!enabled) {
+        if (diskThresholdSettings.isEnabled() == false) {
             return allocation.decision(Decision.YES, NAME, "the disk threshold decider is disabled");
         }
 
@@ -647,7 +377,7 @@ public class DiskThresholdDecider extends AllocationDecider {
      * Returns the expected shard size for the given shard or the default value provided if not enough information are available
      * to estimate the shards size.
      */
-    public static final long getExpectedShardSize(ShardRouting shard, RoutingAllocation allocation, long defaultValue) {
+    public static long getExpectedShardSize(ShardRouting shard, RoutingAllocation allocation, long defaultValue) {
         final IndexMetaData metaData = allocation.metaData().getIndexSafe(shard.index());
         final ClusterInfo info = allocation.clusterInfo();
         if (metaData.getMergeSourceIndex() != null && shard.allocatedPostIndexCreate(metaData) == false) {

+ 6 - 6
core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -32,11 +32,11 @@ import org.elasticsearch.cluster.InternalClusterInfoService;
 import org.elasticsearch.cluster.NodeConnectionsService;
 import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
 import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
 import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
-import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
@@ -193,11 +193,11 @@ public final class ClusterSettings extends AbstractScopedSettings {
                     ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
                     ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
                     ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING,
-                    DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING,
-                    DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING,
-                    DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
-                    DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING,
-                    DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
+                    DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING,
+                    DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING,
+                    DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
+                    DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING,
+                    DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
                     InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING,
                     InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING,
                     SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING,

+ 2 - 0
core/src/main/java/org/elasticsearch/node/NodeModule.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.node;
 
+import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
 import org.elasticsearch.common.inject.AbstractModule;
 import org.elasticsearch.monitor.MonitorService;
 import org.elasticsearch.node.service.NodeService;
@@ -38,5 +39,6 @@ public class NodeModule extends AbstractModule {
         bind(Node.class).toInstance(node);
         bind(MonitorService.class).toInstance(monitorService);
         bind(NodeService.class).asEagerSingleton();
+        bind(DiskThresholdMonitor.class).asEagerSingleton();
     }
 }

+ 64 - 0
core/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java

@@ -0,0 +1,64 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.test.ESTestCase;
+
+public class DiskThresholdSettingsTests extends ESTestCase {
+
+    public void testDefaults() {
+        ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        DiskThresholdSettings diskThresholdSettings = new DiskThresholdSettings(Settings.EMPTY, nss);
+
+        ByteSizeValue zeroBytes = ByteSizeValue.parseBytesSizeValue("0b", "test");
+        assertEquals(zeroBytes, diskThresholdSettings.getFreeBytesThresholdHigh());
+        assertEquals(10.0D, diskThresholdSettings.getFreeDiskThresholdHigh(), 0.0D);
+        assertEquals(zeroBytes, diskThresholdSettings.getFreeBytesThresholdLow());
+        assertEquals(15.0D, diskThresholdSettings.getFreeDiskThresholdLow(), 0.0D);
+        assertEquals(60L, diskThresholdSettings.getRerouteInterval().seconds());
+        assertTrue(diskThresholdSettings.isEnabled());
+        assertTrue(diskThresholdSettings.includeRelocations());
+    }
+
+    public void testUpdate() {
+        ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        DiskThresholdSettings diskThresholdSettings = new DiskThresholdSettings(Settings.EMPTY, nss);
+
+        Settings newSettings = Settings.builder()
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), false)
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%")
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "500mb")
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "30s")
+            .build();
+        nss.applySettings(newSettings);
+
+        assertEquals(ByteSizeValue.parseBytesSizeValue("0b", "test"), diskThresholdSettings.getFreeBytesThresholdHigh());
+        assertEquals(30.0D, diskThresholdSettings.getFreeDiskThresholdHigh(), 0.0D);
+        assertEquals(ByteSizeValue.parseBytesSizeValue("500mb", "test"), diskThresholdSettings.getFreeBytesThresholdLow());
+        assertEquals(0.0D, diskThresholdSettings.getFreeDiskThresholdLow(), 0.0D);
+        assertEquals(30L, diskThresholdSettings.getRerouteInterval().seconds());
+        assertFalse(diskThresholdSettings.isEnabled());
+        assertFalse(diskThresholdSettings.includeRelocations());
+    }
+}

+ 55 - 49
core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java

@@ -39,12 +39,14 @@ import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.TestShardRouting;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
 import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
 import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
 import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.LocalTransportAddress;
 import org.elasticsearch.test.ESAllocationTestCase;
@@ -67,12 +69,16 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 
 public class DiskThresholdDeciderTests extends ESAllocationTestCase {
+    
+    DiskThresholdDecider makeDecider(Settings settings) {
+        return new DiskThresholdDecider(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
+    }
 
     public void testDiskThreshold() {
         Settings diskSettings = Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8).build();
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8).build();
 
         ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
         usagesBuilder.put("node1", new DiskUsage("node1", "node1", "/dev/null", 100, 10)); // 90% used
@@ -90,7 +96,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
                 new HashSet<>(Arrays.asList(
                         new SameShardAllocationDecider(Settings.EMPTY),
-                        new DiskThresholdDecider(diskSettings))));
+                        makeDecider(diskSettings))));
 
         ClusterInfoService cis = new ClusterInfoService() {
             @Override
@@ -184,14 +190,14 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         // Set the high threshold to 70 instead of 80
         // node2 now should not have new shards allocated to it, but shards can remain
         diskSettings = Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%")
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.7).build();
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%")
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.7).build();
 
         deciders = new AllocationDeciders(Settings.EMPTY,
                 new HashSet<>(Arrays.asList(
                         new SameShardAllocationDecider(Settings.EMPTY),
-                        new DiskThresholdDecider(diskSettings))));
+                        makeDecider(diskSettings))));
 
         strategy = new AllocationService(Settings.builder()
                 .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
@@ -215,14 +221,14 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         // Set the high threshold to 60 instead of 70
         // node2 now should not have new shards allocated to it, and shards cannot remain
         diskSettings = Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.5)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.6).build();
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.5)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.6).build();
 
         deciders = new AllocationDeciders(Settings.EMPTY,
                 new HashSet<>(Arrays.asList(
                         new SameShardAllocationDecider(Settings.EMPTY),
-                        new DiskThresholdDecider(diskSettings))));
+                        makeDecider(diskSettings))));
 
         strategy = new AllocationService(Settings.builder()
                 .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
@@ -269,9 +275,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
 
     public void testDiskThresholdWithAbsoluteSizes() {
         Settings diskSettings = Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "30b")
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "9b").build();
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "30b")
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "9b").build();
 
         ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
         usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 10)); // 90% used
@@ -290,7 +296,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
                 new HashSet<>(Arrays.asList(
                         new SameShardAllocationDecider(Settings.EMPTY),
-                        new DiskThresholdDecider(diskSettings))));
+                        makeDecider(diskSettings))));
 
         ClusterInfoService cis = new ClusterInfoService() {
             @Override
@@ -423,14 +429,14 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         // Set the high threshold to 70 instead of 80
         // node2 now should not have new shards allocated to it, but shards can remain
         diskSettings = Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "40b")
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "30b").build();
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "40b")
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "30b").build();
 
         deciders = new AllocationDeciders(Settings.EMPTY,
                 new HashSet<>(Arrays.asList(
                         new SameShardAllocationDecider(Settings.EMPTY),
-                        new DiskThresholdDecider(diskSettings))));
+                        makeDecider(diskSettings))));
 
         strategy = new AllocationService(Settings.builder()
                 .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
@@ -454,14 +460,14 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         // Set the high threshold to 60 instead of 70
         // node2 now should not have new shards allocated to it, and shards cannot remain
         diskSettings = Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "50b")
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "40b").build();
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "50b")
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "40b").build();
 
         deciders = new AllocationDeciders(Settings.EMPTY,
                 new HashSet<>(Arrays.asList(
                         new SameShardAllocationDecider(Settings.EMPTY),
-                        new DiskThresholdDecider(diskSettings))));
+                        makeDecider(diskSettings))));
 
         strategy = new AllocationService(Settings.builder()
                 .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
@@ -542,9 +548,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
 
     public void testDiskThresholdWithShardSizes() {
         Settings diskSettings = Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "71%").build();
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "71%").build();
 
         ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
         usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 31)); // 69% used
@@ -559,7 +565,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
                 new HashSet<>(Arrays.asList(
                         new SameShardAllocationDecider(Settings.EMPTY),
-                        new DiskThresholdDecider(diskSettings))));
+                        makeDecider(diskSettings))));
 
         ClusterInfoService cis = new ClusterInfoService() {
             @Override
@@ -611,9 +617,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
 
     public void testUnknownDiskUsage() {
         Settings diskSettings = Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.85).build();
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.85).build();
 
         ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
         usagesBuilder.put("node2", new DiskUsage("node2", "node2", "/dev/null", 100, 50)); // 50% used
@@ -629,7 +635,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
                 new HashSet<>(Arrays.asList(
                         new SameShardAllocationDecider(Settings.EMPTY),
-                        new DiskThresholdDecider(diskSettings))));
+                        makeDecider(diskSettings))));
 
         ClusterInfoService cis = new ClusterInfoService() {
             @Override
@@ -687,7 +693,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
 
     public void testAverageUsage() {
         RoutingNode rn = new RoutingNode("node1", newNode("node1"));
-        DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY);
+        DiskThresholdDecider decider = makeDecider(Settings.EMPTY);
 
         ImmutableOpenMap.Builder<String, DiskUsage> usages = ImmutableOpenMap.builder();
         usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used
@@ -700,7 +706,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
 
     public void testFreeDiskPercentageAfterShardAssigned() {
         RoutingNode rn = new RoutingNode("node1", newNode("node1"));
-        DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY);
+        DiskThresholdDecider decider = makeDecider(Settings.EMPTY);
 
         Map<String, DiskUsage> usages = new HashMap<>();
         usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used
@@ -712,10 +718,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
 
     public void testShardRelocationsTakenIntoAccount() {
         Settings diskSettings = Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8).build();
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8).build();
 
         ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
         usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 40)); // 60% used
@@ -734,7 +740,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
                 new HashSet<>(Arrays.asList(
                         new SameShardAllocationDecider(Settings.EMPTY),
-                        new DiskThresholdDecider(diskSettings))));
+                        makeDecider(diskSettings))));
 
         ClusterInfoService cis = new ClusterInfoService() {
             @Override
@@ -821,10 +827,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
 
     public void testCanRemainWithShardRelocatingAway() {
         Settings diskSettings = Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%")
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%").build();
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%")
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%").build();
 
         // We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available
         ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
@@ -839,7 +845,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
 
         final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
 
-        DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings);
+        DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings);
         MetaData metaData = MetaData.builder()
                 .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
                 .build();
@@ -937,10 +943,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
 
     public void testForSingleDataNode() {
         Settings diskSettings = Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), true)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%")
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%").build();
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), true)
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%")
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%").build();
 
         ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
         usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 100)); // 0% used
@@ -954,7 +960,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         shardSizes.put("[test][1][p]", 40L);
         final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes.build());
 
-        DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings);
+        DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings);
         MetaData metaData = MetaData.builder()
                 .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
                 .build();

+ 2 - 41
core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java

@@ -59,48 +59,10 @@ import static org.hamcrest.CoreMatchers.equalTo;
  * Unit tests for the DiskThresholdDecider
  */
 public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
-    public void testDynamicSettings() {
-        ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
-
-        ClusterInfoService cis = EmptyClusterInfoService.INSTANCE;
-        DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null);
-
-        assertThat(decider.getFreeBytesThresholdHigh(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "test")));
-        assertThat(decider.getFreeDiskThresholdHigh(), equalTo(10.0d));
-        assertThat(decider.getFreeBytesThresholdLow(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "test")));
-        assertThat(decider.getFreeDiskThresholdLow(), equalTo(15.0d));
-        assertThat(decider.getUsedDiskThresholdLow(), equalTo(85.0d));
-        assertThat(decider.getRerouteInterval().seconds(), equalTo(60L));
-        assertTrue(decider.isEnabled());
-        assertTrue(decider.isIncludeRelocations());
-
-        Settings newSettings = Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), false)
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%")
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "500mb")
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "30s")
-                .build();
-
-        nss.applySettings(newSettings);
-        assertThat("high threshold bytes should be unset",
-                decider.getFreeBytesThresholdHigh(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "test")));
-        assertThat("high threshold percentage should be changed",
-                decider.getFreeDiskThresholdHigh(), equalTo(30.0d));
-        assertThat("low threshold bytes should be set to 500mb",
-                decider.getFreeBytesThresholdLow(), equalTo(ByteSizeValue.parseBytesSizeValue("500mb", "test")));
-        assertThat("low threshold bytes should be unset",
-                decider.getFreeDiskThresholdLow(), equalTo(0.0d));
-        assertThat("reroute interval should be changed to 30 seconds",
-                decider.getRerouteInterval().seconds(), equalTo(30L));
-        assertFalse("disk threshold decider should now be disabled", decider.isEnabled());
-        assertFalse("relocations should now be disabled", decider.isIncludeRelocations());
-    }
 
     public void testCanAllocateUsesMaxAvailableSpace() {
         ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
-        ClusterInfoService cis = EmptyClusterInfoService.INSTANCE;
-        DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null);
+        DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss);
 
         MetaData metaData = MetaData.builder()
                 .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
@@ -144,8 +106,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
 
     public void testCanRemainUsesLeastAvailableSpace() {
         ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
-        ClusterInfoService cis = EmptyClusterInfoService.INSTANCE;
-        DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null);
+        DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss);
         ImmutableOpenMap.Builder<ShardRouting, String> shardRoutingMap = ImmutableOpenMap.builder();
 
         DiscoveryNode node_0 = new DiscoveryNode("node_0", LocalTransportAddress.buildUnique(), Collections.emptyMap(),

+ 4 - 3
core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java

@@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterInfoService;
 import org.elasticsearch.cluster.DiskUsage;
 import org.elasticsearch.cluster.MockInternalClusterInfoService;
 import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.plugins.Plugin;
@@ -74,9 +75,9 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
         cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "/dev/null", 100, 50));
 
         client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), randomFrom("20b", "80%"))
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), randomFrom("10b", "90%"))
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")).get();
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), randomFrom("20b", "80%"))
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), randomFrom("10b", "90%"))
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")).get();
         // Create an index with 10 shards so we can check allocation for it
         prepareCreate("test").setSettings(Settings.builder()
                 .put("number_of_shards", 10)

+ 3 - 3
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -29,6 +29,7 @@ import org.apache.lucene.util.TestUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
 import org.elasticsearch.discovery.DiscoveryModule;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.env.NodeEnvironment;
@@ -72,7 +73,6 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
-import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
@@ -1634,8 +1634,8 @@ public abstract class ESIntegTestCase extends ESTestCase {
                 .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE)
                 // Default the watermarks to absurdly low to prevent the tests
                 // from failing on nodes without enough disk space
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b")
-                .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b")
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b")
+                .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b")
                 .put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000)
                 .put("script.stored", "true")
                 .put("script.inline", "true")

+ 3 - 3
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -43,7 +43,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode.Role;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.OperationRouting;
 import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
+import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
 import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
@@ -316,8 +316,8 @@ public final class InternalTestCluster extends TestCluster {
         }
         // Default the watermarks to absurdly low to prevent the tests
         // from failing on nodes without enough disk space
-        builder.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b");
-        builder.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b");
+        builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b");
+        builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b");
         // Some tests make use of scripting quite a bit, so increase the limit for integration tests
         builder.put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000);
         if (TEST_NIGHTLY) {