Browse Source

Add cluster-wide setting for total shard limit

This adds the `cluster.routing.allocation.total_shards_per_node`
setting, which limits the total number of shards across all indices on
each node. It defaults to -1 and can be dynamically configured.

Resolves #14456
Lee Hinman 10 years ago
parent
commit
145374b762

+ 2 - 1
core/src/main/java/org/elasticsearch/cluster/ClusterModule.java

@@ -205,6 +205,7 @@ public class ClusterModule extends AbstractModule {
         registerClusterDynamicSetting(TransportService.SETTING_TRACE_LOG_EXCLUDE, Validator.EMPTY);
         registerClusterDynamicSetting(TransportService.SETTING_TRACE_LOG_EXCLUDE + ".*", Validator.EMPTY);
         registerClusterDynamicSetting(TransportCloseIndexAction.SETTING_CLUSTER_INDICES_CLOSE_ENABLE, Validator.BOOLEAN);
+        registerClusterDynamicSetting(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE, Validator.INTEGER);
     }
 
     private void registerBuiltinIndexSettings() {
@@ -325,4 +326,4 @@ public class ClusterModule extends AbstractModule {
         bind(NodeMappingRefreshAction.class).asEagerSingleton();
         bind(MappingUpdatedAction.class).asEagerSingleton();
     }
-}
+}

+ 102 - 25
core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java

@@ -24,13 +24,16 @@ import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.settings.NodeSettingsService;
 
 /**
  * This {@link AllocationDecider} limits the number of shards per node on a per
- * index basis. The allocator prevents a single node to hold more than
- * {@value #INDEX_TOTAL_SHARDS_PER_NODE} per index during the allocation
+ * index or node-wide basis. The allocator prevents a single node to hold more
+ * than {@value #INDEX_TOTAL_SHARDS_PER_NODE} per index and
+ * {@value #CLUSTER_TOTAL_SHARDS_PER_NODE} globally during the allocation
  * process. The limits of this decider can be changed in real-time via a the
  * index settings API.
  * <p>
@@ -50,66 +53,140 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
 
     public static final String NAME = "shards_limit";
 
+    private volatile int clusterShardLimit;
+
     /**
      * Controls the maximum number of shards per index on a single Elasticsearch
      * node. Negative values are interpreted as unlimited.
      */
     public static final String INDEX_TOTAL_SHARDS_PER_NODE = "index.routing.allocation.total_shards_per_node";
+    /**
+     * Controls the maximum number of shards per node on a global level.
+     * Negative values are interpreted as unlimited.
+     */
+    public static final String CLUSTER_TOTAL_SHARDS_PER_NODE = "cluster.routing.allocation.total_shards_per_node";
+
+    class ApplySettings implements NodeSettingsService.Listener {
+        @Override
+        public void onRefreshSettings(Settings settings) {
+            Integer newClusterLimit = settings.getAsInt(CLUSTER_TOTAL_SHARDS_PER_NODE, null);
+
+            if (newClusterLimit != null) {
+                logger.info("updating [{}] from [{}] to [{}]", CLUSTER_TOTAL_SHARDS_PER_NODE,
+                        ShardsLimitAllocationDecider.this.clusterShardLimit, newClusterLimit);
+                ShardsLimitAllocationDecider.this.clusterShardLimit = newClusterLimit;
+            }
+        }
+    }
 
     @Inject
-    public ShardsLimitAllocationDecider(Settings settings) {
+    public ShardsLimitAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) {
         super(settings);
+        this.clusterShardLimit = settings.getAsInt(CLUSTER_TOTAL_SHARDS_PER_NODE, -1);
+        nodeSettingsService.addListener(new ApplySettings());
     }
 
     @Override
     public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
         IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
-        int totalShardsPerNode = indexMd.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1);
-        if (totalShardsPerNode <= 0) {
-            return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [%d] <= 0", totalShardsPerNode);
+        int indexShardLimit = indexMd.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1);
+        // Capture the limit here in case it changes during this method's
+        // execution
+        final int clusterShardLimit = this.clusterShardLimit;
+
+        if (indexShardLimit <= 0 && clusterShardLimit <= 0) {
+            return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0",
+                    indexShardLimit, clusterShardLimit);
         }
 
-        int nodeCount = 0;
+        int indexShardCount = 0;
+        int nodeShardCount = 0;
         for (ShardRouting nodeShard : node) {
-            if (!nodeShard.index().equals(shardRouting.index())) {
-                continue;
-            }
             // don't count relocating shards...
             if (nodeShard.relocating()) {
                 continue;
             }
-            nodeCount++;
+            nodeShardCount++;
+            if (nodeShard.index().equals(shardRouting.index())) {
+                indexShardCount++;
+            }
+        }
+        if (clusterShardLimit > 0 && nodeShardCount >= clusterShardLimit) {
+            return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
+                    nodeShardCount, clusterShardLimit);
         }
-        if (nodeCount >= totalShardsPerNode) {
-            return allocation.decision(Decision.NO, NAME, "too many shards for this index on node [%d], limit: [%d]",
-                    nodeCount, totalShardsPerNode);
+        if (indexShardLimit > 0 && indexShardCount >= indexShardLimit) {
+            return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]",
+                    shardRouting.index(), indexShardCount, indexShardLimit);
         }
-        return allocation.decision(Decision.YES, NAME, "shard count under limit [%d] of total shards per node", totalShardsPerNode);
+        return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node",
+                indexShardLimit, clusterShardLimit);
     }
 
     @Override
     public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
         IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
-        int totalShardsPerNode = indexMd.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1);
-        if (totalShardsPerNode <= 0) {
-            return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [%d] <= 0", totalShardsPerNode);
+        int indexShardLimit = indexMd.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1);
+        // Capture the limit here in case it changes during this method's
+        // execution
+        final int clusterShardLimit = this.clusterShardLimit;
+
+        if (indexShardLimit <= 0 && clusterShardLimit <= 0) {
+            return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0",
+                    indexShardLimit, clusterShardLimit);
         }
 
-        int nodeCount = 0;
+        int indexShardCount = 0;
+        int nodeShardCount = 0;
         for (ShardRouting nodeShard : node) {
-            if (!nodeShard.index().equals(shardRouting.index())) {
+            // don't count relocating shards...
+            if (nodeShard.relocating()) {
                 continue;
             }
+            nodeShardCount++;
+            if (nodeShard.index().equals(shardRouting.index())) {
+                indexShardCount++;
+            }
+        }
+        // Subtle difference between the `canAllocate` and `canRemain` is that
+        // this checks > while canAllocate checks >=
+        if (clusterShardLimit > 0 && nodeShardCount > clusterShardLimit) {
+            return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
+                    nodeShardCount, clusterShardLimit);
+        }
+        if (indexShardLimit > 0 && indexShardCount > indexShardLimit) {
+            return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]",
+                    shardRouting.index(), indexShardCount, indexShardLimit);
+        }
+        return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node",
+                indexShardLimit, clusterShardLimit);
+    }
+
+    @Override
+    public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
+        // Only checks the node-level limit, not the index-level
+        // Capture the limit here in case it changes during this method's
+        // execution
+        final int clusterShardLimit = this.clusterShardLimit;
+
+        if (clusterShardLimit <= 0) {
+            return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [cluster: %d] <= 0",
+                    clusterShardLimit);
+        }
+
+        int nodeShardCount = 0;
+        for (ShardRouting nodeShard : node) {
             // don't count relocating shards...
             if (nodeShard.relocating()) {
                 continue;
             }
-            nodeCount++;
+            nodeShardCount++;
         }
-        if (nodeCount > totalShardsPerNode) {
-            return allocation.decision(Decision.NO, NAME, "too many shards for this index on node [%d], limit: [%d]",
-                    nodeCount, totalShardsPerNode);
+        if (clusterShardLimit >= 0 && nodeShardCount >= clusterShardLimit) {
+            return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
+                    nodeShardCount, clusterShardLimit);
         }
-        return allocation.decision(Decision.YES, NAME, "shard count under limit [%d] of total shards per node", totalShardsPerNode);
+        return allocation.decision(Decision.YES, NAME, "shard count under node limit [%d] of total shards per node",
+                clusterShardLimit);
     }
 }

+ 59 - 0
core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardsLimitAllocationTests.java

@@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
 import org.elasticsearch.common.logging.ESLogger;
 import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESAllocationTestCase;
 
 import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
@@ -87,6 +88,64 @@ public class ShardsLimitAllocationTests extends ESAllocationTestCase {
         clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
     }
 
+    public void testClusterLevelShardsLimitAllocate() {
+        AllocationService strategy = createAllocationService(settingsBuilder()
+                .put("cluster.routing.allocation.concurrent_recoveries", 10)
+                .put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE, 1)
+                .build());
+
+        logger.info("Building initial routing table");
+
+        MetaData metaData = MetaData.builder()
+                .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 4)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)))
+                .build();
+
+        RoutingTable routingTable = RoutingTable.builder()
+                .addAsNew(metaData.index("test"))
+                .build();
+
+        ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
+        logger.info("Adding two nodes and performing rerouting");
+        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
+        routingTable = strategy.reroute(clusterState).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+
+        assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
+        assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
+
+        logger.info("Start the primary shards");
+        RoutingNodes routingNodes = clusterState.getRoutingNodes();
+        routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+
+        assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(1));
+        assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(1));
+        assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(2));
+
+        // Bump the cluster total shards to 2
+        strategy = createAllocationService(settingsBuilder()
+                .put("cluster.routing.allocation.concurrent_recoveries", 10)
+                .put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE, 2)
+                .build());
+
+        logger.info("Do another reroute, make sure shards are now allocated");
+        routingTable = strategy.reroute(clusterState).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+
+        assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
+        assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
+
+        routingNodes = clusterState.getRoutingNodes();
+        routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+
+        assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(2));
+        assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(2));
+        assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(0));
+    }
+
     public void testIndexLevelShardsLimitRemain() {
         AllocationService strategy = createAllocationService(settingsBuilder()
                 .put("cluster.routing.allocation.concurrent_recoveries", 10)

+ 9 - 2
docs/reference/index-modules/allocation/total_shards.asciidoc

@@ -14,10 +14,17 @@ number of shards from a single index allowed per node:
     The maximum number of shards (replicas and primaries) that will be
     allocated to a single node.  Defaults to unbounded.
 
+You can also limit the amount of shards a node can have regardless of the index:
+
+`cluster.routing.allocation.total_shards_per_node`::
+
+    The maximum number of shards (replicas and primaries) that will be
+    allocated to a single node globally.  Defaults to unbounded (-1).
+
 [WARNING]
 =======================================
-This setting imposes a hard limit which can result in some shards not
-being allocated.
+Thess setting impose a hard limit which can result in some shards not being
+allocated.
 
 Use with caution.
 =======================================