Browse Source

Add separate flood stage limit for frozen (#71855)

Dedicated frozen nodes can survive less headroom than other data nodes.
This commits introduces a separate flood stage threshold for frozen as
well as an accompanying max_headroom setting that caps the amount of
free space necessary on frozen.

Relates #71844
Henning Andersen 4 years ago
parent
commit
794869cfbb

+ 14 - 0
docs/reference/modules/cluster/disk_allocator.asciidoc

@@ -113,6 +113,20 @@ PUT /my-index-000001/_settings
 --
 // end::cluster-routing-flood-stage-tag[]
 
+[[cluster-routing-flood-stage-frozen]]
+// tag::cluster-routing-flood-stage-tag[]
+`cluster.routing.allocation.disk.watermark.flood_stage.frozen` {ess-icon}::
+(<<dynamic-cluster-setting,Dynamic>>)
+Controls the flood stage watermark for dedicated frozen nodes, which defaults to
+95%.
+
+`cluster.routing.allocation.disk.watermark.flood_stage.frozen.max_headroom` {ess-icon}::
+(<<dynamic-cluster-setting,Dynamic>>)
+Controls the max headroom for the flood stage watermark for dedicated frozen
+nodes. Defaults to 20GB when
+`cluster.routing.allocation.disk.watermark.flood_stage.frozen` is not explicitly
+set. This caps the amount of free space required on dedicated frozen nodes.
+
 `cluster.info.update.interval`::
     (<<dynamic-cluster-setting,Dynamic>>)
     How often {es} should check on disk usage for each node in the

+ 13 - 11
server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

@@ -33,6 +33,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.set.Sets;
 
 import java.util.ArrayList;
@@ -136,13 +137,20 @@ public class DiskThresholdMonitor {
             final DiskUsage usage = entry.value;
             final RoutingNode routingNode = routingNodes.node(node);
 
-            if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
-                usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
-                if (isFrozenOnlyNode(routingNode)) {
+            if (isFrozenOnlyNode(routingNode)) {
+                ByteSizeValue total = ByteSizeValue.ofBytes(usage.getTotalBytes());
+                long frozenFloodStageThreshold = diskThresholdSettings.getFreeBytesThresholdFrozenFloodStage(total).getBytes();
+                if (usage.getFreeBytes() < frozenFloodStageThreshold) {
                     logger.warn("flood stage disk watermark [{}] exceeded on {}",
-                        diskThresholdSettings.describeFloodStageThreshold(), usage);
-                    continue;
+                        diskThresholdSettings.describeFrozenFloodStageThreshold(total), usage);
                 }
+                // skip checking high/low watermarks for frozen nodes, since frozen shards have only insignificant local storage footprint
+                // and this allows us to use more of the local storage for cache.
+                continue;
+            }
+
+            if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
+                usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
 
                 nodesOverLowThreshold.add(node);
                 nodesOverHighThreshold.add(node);
@@ -162,12 +170,6 @@ public class DiskThresholdMonitor {
                 continue;
             }
 
-            if (isFrozenOnlyNode(routingNode)) {
-                // skip checking high/low watermarks for frozen nodes, since frozen shards have only insignificant local storage footprint
-                // and this allows us to use more of the local storage for cache.
-                continue;
-            }
-
             if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
                 usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
 

+ 53 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java

@@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.RatioValue;
+import org.elasticsearch.common.unit.RelativeByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 
 import java.util.Iterator;
@@ -45,6 +46,21 @@ public class DiskThresholdSettings {
             (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.flood_stage"),
             new FloodStageValidator(),
             Setting.Property.Dynamic, Setting.Property.NodeScope);
+    public static final Setting<RelativeByteSizeValue> CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_SETTING =
+        new Setting<>("cluster.routing.allocation.disk.watermark.flood_stage.frozen", "95%",
+            (s) -> RelativeByteSizeValue.parseRelativeByteSizeValue(s,  "cluster.routing.allocation.disk.watermark.flood_stage.frozen"),
+            Setting.Property.Dynamic, Setting.Property.NodeScope);
+    public static final Setting<ByteSizeValue> CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING =
+        new Setting<>("cluster.routing.allocation.disk.watermark.flood_stage.frozen.max_headroom",
+            (settings) -> {
+                if (CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_SETTING.exists(settings)) {
+                    return "-1";
+                } else {
+                    return "20GB";
+                }
+            },
+            (s) -> ByteSizeValue.parseBytesSizeValue(s,  "cluster.routing.allocation.disk.watermark.flood_stage.frozen.max_headroom"),
+            Setting.Property.Dynamic, Setting.Property.NodeScope);
     public static final Setting<TimeValue> CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING =
         Setting.positiveTimeSetting("cluster.routing.allocation.disk.reroute_interval", TimeValue.timeValueSeconds(60),
             Setting.Property.Dynamic, Setting.Property.NodeScope);
@@ -59,6 +75,8 @@ public class DiskThresholdSettings {
     private volatile TimeValue rerouteInterval;
     private volatile Double freeDiskThresholdFloodStage;
     private volatile ByteSizeValue freeBytesThresholdFloodStage;
+    private volatile RelativeByteSizeValue frozenFloodStage;
+    private volatile ByteSizeValue frozenFloodStageMaxHeadroom;
 
     static {
         assert Version.CURRENT.major == Version.V_7_0_0.major + 1; // this check is unnecessary in v9
@@ -69,6 +87,7 @@ public class DiskThresholdSettings {
         }
     }
 
+
     public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
         final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
         final String highWatermark = CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.get(settings);
@@ -76,11 +95,16 @@ public class DiskThresholdSettings {
         setHighWatermark(highWatermark);
         setLowWatermark(lowWatermark);
         setFloodStage(floodStage);
+        setFrozenFloodStage(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_SETTING.get(settings));
+        setFrozenFloodStageMaxHeadroom(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING.get(settings));
         this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings);
         this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
         clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
         clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
         clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING, this::setFloodStage);
+        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_SETTING, this::setFrozenFloodStage);
+        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING,
+            this::setFrozenFloodStageMaxHeadroom);
         clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
         clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
     }
@@ -242,6 +266,15 @@ public class DiskThresholdSettings {
             CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey());
     }
 
+    private void setFrozenFloodStage(RelativeByteSizeValue floodStage) {
+        this.frozenFloodStage = floodStage;
+    }
+
+    private void setFrozenFloodStageMaxHeadroom(ByteSizeValue maxHeadroom) {
+        this.frozenFloodStageMaxHeadroom = maxHeadroom;
+    }
+
+
     /**
      * Gets the raw (uninterpreted) low watermark value as found in the settings.
      */
@@ -280,6 +313,14 @@ public class DiskThresholdSettings {
         return freeBytesThresholdFloodStage;
     }
 
+    public ByteSizeValue getFreeBytesThresholdFrozenFloodStage(ByteSizeValue total) {
+        // flood stage bytes are reversed compared to percentage, so we special handle it.
+        RelativeByteSizeValue frozenFloodStage = this.frozenFloodStage;
+        if (frozenFloodStage.isAbsolute()) {
+            return frozenFloodStage.getAbsolute();
+        }
+        return ByteSizeValue.ofBytes(total.getBytes() - frozenFloodStage.calculateValue(total, frozenFloodStageMaxHeadroom).getBytes());
+    }
     public boolean isEnabled() {
         return enabled;
     }
@@ -306,6 +347,18 @@ public class DiskThresholdSettings {
             : freeBytesThresholdFloodStage.toString();
     }
 
+    String describeFrozenFloodStageThreshold(ByteSizeValue total) {
+        ByteSizeValue maxHeadroom = this.frozenFloodStageMaxHeadroom;
+        RelativeByteSizeValue floodStage = this.frozenFloodStage;
+        if (floodStage.isAbsolute()) {
+            return floodStage.getStringRep();
+        } else if (floodStage.calculateValue(total, maxHeadroom).equals(floodStage.calculateValue(total, null))) {
+            return Strings.format1Decimals(floodStage.getRatio().getAsPercent(), "%");
+        } else {
+            return "max_headroom=" + maxHeadroom;
+        }
+    }
+
     /**
      * Attempts to parse the watermark into a percentage, returning 100.0% if
      * it cannot be parsed.

+ 2 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -221,6 +221,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
             DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING,
             DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING,
             DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING,
+            DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_SETTING,
+            DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING,
             DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
             DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
             SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING,

+ 101 - 0
server/src/main/java/org/elasticsearch/common/unit/RelativeByteSizeValue.java

@@ -0,0 +1,101 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.common.unit;
+
+import org.elasticsearch.ElasticsearchParseException;
+
+/**
+ * A byte size value that allows specification using either of:
+ * 1. Absolute value (200GB for instance)
+ * 2. Relative percentage value (95%)
+ * 3. Relative ratio value (0.95)
+ */
+public class RelativeByteSizeValue {
+
+    public static final String MAX_HEADROOM_PREFIX = "max_headroom=";
+    private final ByteSizeValue absolute;
+    private final RatioValue ratio;
+
+    public RelativeByteSizeValue(ByteSizeValue absolute) {
+        this.absolute = absolute;
+        this.ratio = null;
+    }
+
+    public RelativeByteSizeValue(RatioValue ratio) {
+        this.absolute = null;
+        this.ratio = ratio;
+    }
+
+    public boolean isAbsolute() {
+        return absolute != null;
+    }
+
+    public ByteSizeValue getAbsolute() {
+        return absolute;
+    }
+
+    public RatioValue getRatio() {
+        return ratio;
+    }
+
+    /**
+     * Calculate the size to use, optionally catering for a max headroom.
+     * @param total the total size to use
+     * @param maxHeadroom the max headroom to cater for or null (or -1) to ignore.
+     * @return the size to use
+     */
+    public ByteSizeValue calculateValue(ByteSizeValue total, ByteSizeValue maxHeadroom) {
+        if (ratio != null) {
+            long ratioBytes = (long) Math.ceil(ratio.getAsRatio() * total.getBytes());
+            if (maxHeadroom != null && maxHeadroom.getBytes() != -1) {
+                return ByteSizeValue.ofBytes(Math.max(ratioBytes, total.getBytes() - maxHeadroom.getBytes()));
+            } else {
+                return ByteSizeValue.ofBytes(ratioBytes);
+            }
+        } else {
+            return absolute;
+        }
+    }
+
+    public boolean isNonZeroSize() {
+        if (ratio != null) {
+            return ratio.getAsRatio() > 0.0d;
+        } else {
+            return absolute.getBytes() > 0;
+        }
+    }
+
+    public static RelativeByteSizeValue parseRelativeByteSizeValue(String value, String settingName) {
+        try {
+            RatioValue ratio = RatioValue.parseRatioValue(value);
+            if (ratio.getAsPercent() != 0.0d || value.endsWith("%")) {
+                return new RelativeByteSizeValue(ratio);
+            } else {
+                return new RelativeByteSizeValue(ByteSizeValue.ZERO);
+            }
+        } catch (ElasticsearchParseException e) {
+            // ignore, see if it parses as bytes
+        }
+        try {
+            return new RelativeByteSizeValue(ByteSizeValue.parseBytesSizeValue(value, settingName));
+            // todo: fix NumberFormatException case in ByteSizeValue.
+        } catch (NumberFormatException | ElasticsearchParseException e) {
+            throw new ElasticsearchParseException("unable to parse [{}={}] as either percentage or bytes", e,
+                settingName, value);
+        }
+    }
+
+    public String getStringRep() {
+        if (ratio != null) {
+            return ratio.toString();
+        } else {
+            return absolute.getStringRep();
+        }
+    }
+}

+ 17 - 1
server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java

@@ -32,6 +32,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.MockLogAppender;
@@ -450,7 +451,12 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
         final ImmutableOpenMap.Builder<String, DiskUsage> allDisksOkBuilder;
         allDisksOkBuilder = ImmutableOpenMap.builder();
         allDisksOkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(15, 100)));
-        allDisksOkBuilder.put("frozen", new DiskUsage("frozen", "frozen", "/foo/bar", 100, between(15, 100)));
+        if (randomBoolean()) {
+            allDisksOkBuilder.put("frozen", new DiskUsage("frozen", "frozen", "/foo/bar", 100, between(15, 100)));
+        } else {
+            allDisksOkBuilder.put("frozen", new DiskUsage("frozen", "frozen", "/foo/bar", ByteSizeValue.ofGb(1000).getBytes(),
+                (randomBoolean() ? ByteSizeValue.ofGb(between(20, 1000)) : ByteSizeValue.ofGb(between(20, 50))).getBytes()));
+        }
         final ImmutableOpenMap<String, DiskUsage> allDisksOk = allDisksOkBuilder.build();
 
         final ImmutableOpenMap.Builder<String, DiskUsage> aboveLowWatermarkBuilder = ImmutableOpenMap.builder();
@@ -475,6 +481,13 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
         frozenAboveFloodStageWatermarkBuilder.put("frozen", new DiskUsage("frozen", "frozen", "/foo/bar", 100, between(0, 4)));
         final ImmutableOpenMap<String, DiskUsage> frozenAboveFloodStageWatermark = frozenAboveFloodStageWatermarkBuilder.build();
 
+        final ImmutableOpenMap.Builder<String, DiskUsage> frozenAboveFloodStageMaxHeadroomBuilder = ImmutableOpenMap.builder();
+        // node1 is below low watermark, so no logging from it.
+        frozenAboveFloodStageMaxHeadroomBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(15, 100)));
+        frozenAboveFloodStageMaxHeadroomBuilder.put("frozen", new DiskUsage("frozen", "frozen", "/foo/bar",
+            ByteSizeValue.ofGb(1000).getBytes(), ByteSizeValue.ofGb(between(0, 19)).getBytes()));
+        final ImmutableOpenMap<String, DiskUsage> frozenAboveFloodStageMaxHeadroom = frozenAboveFloodStageMaxHeadroomBuilder.build();
+
         assertNoLogging(monitor, allDisksOk);
 
         assertSingleInfoMessage(monitor, aboveLowWatermark,
@@ -546,6 +559,9 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
 
         assertRepeatedWarningMessages(monitor, frozenAboveFloodStageWatermark, "flood stage disk watermark [95%] exceeded on *frozen*");
 
+        assertRepeatedWarningMessages(monitor, frozenAboveFloodStageMaxHeadroom,
+            "flood stage disk watermark [max_headroom=20gb] exceeded on *frozen*");
+
         assertNoLogging(monitor, allDisksOk);
     }
 

+ 68 - 0
server/src/test/java/org/elasticsearch/common/unit/RelativeByteSizeValueTests.java

@@ -0,0 +1,68 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.common.unit;
+
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class RelativeByteSizeValueTests extends ESTestCase {
+
+    public void testPercentage() {
+        double value = randomIntBetween(0, 100);
+        RelativeByteSizeValue parsed = RelativeByteSizeValue.parseRelativeByteSizeValue(value + "%", "test");
+        assertThat(parsed.getRatio().getAsPercent(), equalTo(value));
+        assertThat(parsed.isAbsolute(), is(false));
+        assertThat(parsed.isNonZeroSize(), is(value != 0.0d));
+    }
+
+    public void testRatio() {
+        double value = (double) randomIntBetween(1, 100) / 100;
+        RelativeByteSizeValue parsed = RelativeByteSizeValue.parseRelativeByteSizeValue(Double.toString(value), "test");
+        assertThat(parsed.getRatio().getAsRatio(),
+            equalTo(value));
+        assertThat(parsed.isAbsolute(), is(false));
+        assertThat(parsed.isNonZeroSize(), is(true));
+    }
+
+    public void testAbsolute() {
+        ByteSizeValue value = new ByteSizeValue(between(0, 100), randomFrom(ByteSizeUnit.values()));
+        RelativeByteSizeValue parsed = RelativeByteSizeValue.parseRelativeByteSizeValue(value.getStringRep(), "test");
+        assertThat(parsed.getAbsolute(), equalTo(value));
+        assertThat(parsed.isAbsolute(), is(true));
+        assertThat(parsed.isNonZeroSize(), is(value.getBytes() != 0));
+    }
+
+    public void testZeroAbsolute() {
+        RelativeByteSizeValue parsed = RelativeByteSizeValue.parseRelativeByteSizeValue("0", "test");
+        assertThat(parsed.getAbsolute(), equalTo(ByteSizeValue.ZERO));
+        assertThat(parsed.isAbsolute(), is(true));
+        assertThat(parsed.isNonZeroSize(), is(false));
+    }
+
+    public void testFail() {
+        assertFail("a", "unable to parse [test=a] as either percentage or bytes");
+        assertFail("%", "unable to parse [test=%] as either percentage or bytes");
+        assertFail("GB", "unable to parse [test=GB] as either percentage or bytes");
+        assertFail("GB%", "unable to parse [test=GB%] as either percentage or bytes");
+        assertFail("100 NB", "unable to parse [test=100 NB] as either percentage or bytes");
+        assertFail("100 %a", "unable to parse [test=100 %a] as either percentage or bytes");
+        assertFail("100 GB a", "unable to parse [test=100 GB a] as either percentage or bytes");
+        assertFail("0,1 GB", "unable to parse [test=0,1 GB] as either percentage or bytes");
+        assertFail("0,1", "unable to parse [test=0,1] as either percentage or bytes");
+    }
+
+    private void assertFail(String value, String failure) {
+        ElasticsearchParseException exception = expectThrows(ElasticsearchParseException.class,
+            () -> RelativeByteSizeValue.parseRelativeByteSizeValue(value, "test"));
+        assertThat(exception.getMessage(), equalTo(failure));
+    }
+}