Przeglądaj źródła

Merge remote-tracking branch 'dakrone/disk-decider-relocation-switcharoo'

Lee Hinman 9 lat temu
rodzic
commit
7da8be9874

+ 11 - 7
core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

@@ -78,11 +78,10 @@ public class DiskThresholdDecider extends AllocationDecider {
      * Returns the size of all shards that are currently being relocated to
      * the node, but may not be finished transferring yet.
      *
-     * If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
-     * of all shards
+     * If subtractShardsMovingAway is true then the size of shards moving away is subtracted from the total size of all shards
      */
     static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation,
-                                              boolean subtractShardsMovingAway, String dataPath) {
+                                       boolean subtractShardsMovingAway, String dataPath) {
         ClusterInfo clusterInfo = allocation.clusterInfo();
         long totalSize = 0;
         for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
@@ -111,7 +110,9 @@ public class DiskThresholdDecider extends AllocationDecider {
         final double usedDiskThresholdLow = 100.0 - diskThresholdSettings.getFreeDiskThresholdLow();
         final double usedDiskThresholdHigh = 100.0 - diskThresholdSettings.getFreeDiskThresholdHigh();
 
-        DiskUsage usage = getDiskUsage(node, allocation, usages);
+        // subtractLeavingShards is passed as false here, because they still use disk space, and therefore should we should be extra careful
+        // and take the size into account
+        DiskUsage usage = getDiskUsage(node, allocation, usages, false);
         // First, check that the node currently over the low watermark
         double freeDiskPercentage = usage.getFreeDiskAsPercentage();
         // Cache the used disk percentage for displaying disk percentages consistent with documentation
@@ -243,7 +244,9 @@ public class DiskThresholdDecider extends AllocationDecider {
             return decision;
         }
 
-        final DiskUsage usage = getDiskUsage(node, allocation, usages);
+        // subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk
+        // since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check.
+        final DiskUsage usage = getDiskUsage(node, allocation, usages, true);
         final String dataPath = clusterInfo.getDataPath(shardRouting);
         // If this node is already above the high threshold, the shard cannot remain (get it off!)
         final double freeDiskPercentage = usage.getFreeDiskAsPercentage();
@@ -280,7 +283,8 @@ public class DiskThresholdDecider extends AllocationDecider {
                 "there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes));
     }
 
-    private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) {
+    private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation,
+                                   ImmutableOpenMap<String, DiskUsage> usages, boolean subtractLeavingShards) {
         DiskUsage usage = usages.get(node.nodeId());
         if (usage == null) {
             // If there is no usage, and we have other nodes in the cluster,
@@ -293,7 +297,7 @@ public class DiskThresholdDecider extends AllocationDecider {
         }
 
         if (diskThresholdSettings.includeRelocations()) {
-            long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, true, usage.getPath());
+            long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath());
             DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(),
                     usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
             if (logger.isTraceEnabled()) {

+ 9 - 3
core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java

@@ -56,6 +56,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singleton;
@@ -729,10 +730,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
         final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
 
+        DiskThresholdDecider decider = makeDecider(diskSettings);
         AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
-                new HashSet<>(Arrays.asList(
-                        new SameShardAllocationDecider(Settings.EMPTY),
-                        makeDecider(diskSettings))));
+                new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), decider)));
 
         ClusterInfoService cis = new ClusterInfoService() {
             @Override
@@ -832,6 +832,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         ImmutableOpenMap.Builder<String, Long> shardSizesBuilder = ImmutableOpenMap.builder();
         shardSizesBuilder.put("[test][0][p]", 40L);
         shardSizesBuilder.put("[test][1][p]", 40L);
+        shardSizesBuilder.put("[foo][0][p]", 10L);
         ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
 
         final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
@@ -839,10 +840,12 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings);
         MetaData metaData = MetaData.builder()
                 .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
+                .put(IndexMetaData.builder("foo").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
                 .build();
 
         RoutingTable initialRoutingTable = RoutingTable.builder()
                 .addAsNew(metaData.index("test"))
+                .addAsNew(metaData.index("foo"))
                 .build();
 
         DiscoveryNode discoveryNode1 = new DiscoveryNode("node1", new LocalTransportAddress("1"), emptyMap(),
@@ -881,6 +884,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         // Two shards consuming each 80% of disk space while 70% is allowed, but one is relocating, so shard 0 can stay
         firstRouting = TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED);
         secondRouting = TestShardRouting.newShardRouting("test", 1, "node1", "node2", true, ShardRoutingState.RELOCATING);
+        ShardRouting fooRouting = TestShardRouting.newShardRouting("foo", 0, "node1", null, true, ShardRoutingState.UNASSIGNED);
         firstRoutingNode = new RoutingNode("node1", discoveryNode1, firstRouting, secondRouting);
         builder = RoutingTable.builder().add(
                 IndexRoutingTable.builder(firstRouting.index())
@@ -898,6 +902,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
             false);
         decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
         assertThat(decision.type(), equalTo(Decision.Type.YES));
+        decision = diskThresholdDecider.canAllocate(fooRouting, firstRoutingNode, routingAllocation);
+        assertThat(decision.type(), equalTo(Decision.Type.NO));
 
         // Creating AllocationService instance and the services it depends on...
         ClusterInfoService cis = new ClusterInfoService() {