Browse Source

Add cluster-wide shard limit warnings (#34021)

In a future major version, we will be introducing a soft limit on the
number of shards in a cluster based on the number of nodes in the
cluster. This limit will be configurable, and checked on operations
which create or open shards and issue a warning if the operation would
take the cluster over the limit.

There is an option to enable strict enforcement of the limit, which
turns the warnings into errors.  In a future release, the option will be
removed and strict enforcement will be the default (and only) behavior.
Gordon Brown 7 years ago
parent
commit
da20dfd81c

+ 7 - 0
docs/reference/migration/migrate_7_0/cluster.asciidoc

@@ -18,3 +18,10 @@ primary shards of the opened index to be allocated.
 [float]
 ==== Shard preferences `_primary`, `_primary_first`, `_replica`, and `_replica_first` are removed
 These shard preferences are removed in favour of the `_prefer_nodes` and `_only_nodes` preferences.
+
+[float]
+==== Cluster-wide shard soft limit
+Clusters now have soft limits on the total number of open shards in the cluster
+based on the number of nodes and the `cluster.max_shards_per_node` cluster
+setting, to prevent accidental operations that would destabilize the cluster.
+More information can be found in the <<misc-cluster,documentation for that setting>>.

+ 43 - 1
docs/reference/modules/cluster/misc.asciidoc

@@ -22,6 +22,48 @@ user with access to the <<cluster-update-settings,cluster-update-settings>>
 API can make the cluster read-write again.
 
 
+[[cluster-shard-limit]]
+
+==== Cluster Shard Limit
+
+In a Elasticsearch 7.0 and later, there will be a soft limit on the number of
+shards in a cluster, based on the number of nodes in the cluster.  This is
+intended to prevent operations which may unintentionally destabilize the
+cluster. Prior to 7.0, actions which would result in the cluster going over the
+limit will issue a deprecation warning.
+
+NOTE: You can set the system property `es.enforce_max_shards_per_node` to `true`
+to opt in to strict enforcement of the shard limit. If this system property is
+set, actions which would result in the cluster going over the limit will result
+in an error, rather than a deprecation warning. This property will be removed in
+Elasticsearch 7.0, as strict enforcement of the limit will be the default and
+only behavior.
+
+If an operation, such as creating a new index, restoring a snapshot of an index,
+or opening a closed index would lead to the number of shards in the cluster
+going over this limit, the operation will issue a deprecation warning.
+
+If the cluster is already over the limit, due to changes in node membership or
+setting changes, all operations that create or open indices will issue warnings
+until either the limit is increased as described below, or some indices are
+<<indices-open-close,closed>> or <<indices-delete-index,deleted>> to bring the
+number of shards below the limit.
+
+Replicas count towards this limit, but closed indexes do not. An index with 5
+primary shards and 2 replicas will be counted as 15 shards.  Any closed index
+is counted as 0, no matter how many shards and replicas it contains.
+
+The limit defaults to 1,000 shards per node, and be dynamically adjusted using
+the following property:
+
+`cluster.max_shards_per_node`::
+
+     Controls the number of shards allowed in the cluster per node.
+
+For example, a 3-node cluster with the default setting would allow 3,000 shards
+total, across all open indexes.  If the above setting is changed to 1,500, then
+the cluster would allow 4,500 shards total.
+
 [[user-defined-data]]
 ==== User Defined Cluster Metadata
 
@@ -109,4 +151,4 @@ Enable or disable allocation for persistent tasks:
 This setting does not affect the persistent tasks that are already being executed.
 Only newly created persistent tasks, or tasks that must be reassigned (after a node
 left the cluster, for example), are impacted by this setting.
---
+--

+ 28 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

@@ -22,7 +22,6 @@ package org.elasticsearch.cluster.metadata;
 import com.carrotsearch.hppc.ObjectHashSet;
 import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
-
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.CollectionUtil;
 import org.elasticsearch.action.AliasesRequest;
@@ -124,9 +123,11 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
     public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, ClusterState.FeatureAware {
 
         EnumSet<XContentContext> context();
-
     }
 
+    public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_NODE =
+        Setting.intSetting("cluster.max_shards_per_node", 1000, 1, Property.Dynamic, Property.NodeScope);
+
     public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
         Setting.boolSetting("cluster.blocks.read_only", false, Property.Dynamic, Property.NodeScope);
 
@@ -162,6 +163,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
     private final ImmutableOpenMap<String, Custom> customs;
 
     private final transient int totalNumberOfShards; // Transient ? not serializable anyway?
+    private final int totalOpenIndexShards;
     private final int numberOfShards;
 
     private final String[] allIndices;
@@ -183,12 +185,17 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
         this.customs = customs;
         this.templates = templates;
         int totalNumberOfShards = 0;
+        int totalOpenIndexShards = 0;
         int numberOfShards = 0;
         for (ObjectCursor<IndexMetaData> cursor : indices.values()) {
             totalNumberOfShards += cursor.value.getTotalNumberOfShards();
             numberOfShards += cursor.value.getNumberOfShards();
+            if (IndexMetaData.State.OPEN.equals(cursor.value.getState())) {
+                totalOpenIndexShards += cursor.value.getTotalNumberOfShards();
+            }
         }
         this.totalNumberOfShards = totalNumberOfShards;
+        this.totalOpenIndexShards = totalOpenIndexShards;
         this.numberOfShards = numberOfShards;
 
         this.allIndices = allIndices;
@@ -667,10 +674,29 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
     }
 
 
+    /**
+     * Gets the total number of shards from all indices, including replicas and
+     * closed indices.
+     * @return The total number shards from all indices.
+     */
     public int getTotalNumberOfShards() {
         return this.totalNumberOfShards;
     }
 
+    /**
+     * Gets the total number of open shards from all indices. Includes
+     * replicas, but does not include shards that are part of closed indices.
+     * @return The total number of open shards from all indices.
+     */
+    public int getTotalOpenIndexShards() {
+        return this.totalOpenIndexShards;
+    }
+
+    /**
+     * Gets the number of primary shards from all indices, not including
+     * replicas.
+     * @return The number of primary shards from all indices.
+     */
     public int getNumberOfShards() {
         return this.numberOfShards;
     }

+ 24 - 3
server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

@@ -53,6 +53,7 @@ import org.elasticsearch.common.ValidationException;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
@@ -82,6 +83,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
@@ -587,12 +589,16 @@ public class MetaDataCreateIndexService extends AbstractComponent {
 
     private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) {
         validateIndexName(request.index(), state);
-        validateIndexSettings(request.index(), request.settings(), forbidPrivateIndexSettings);
+        validateIndexSettings(request.index(), request.settings(), state, forbidPrivateIndexSettings);
     }
 
-    public void validateIndexSettings(
-            final String indexName, final Settings settings, final boolean forbidPrivateIndexSettings) throws IndexCreationException {
+    public void validateIndexSettings(String indexName, final Settings settings, final ClusterState clusterState,
+                                      final boolean forbidPrivateIndexSettings) throws IndexCreationException {
         List<String> validationErrors = getIndexSettingsValidationErrors(settings, forbidPrivateIndexSettings);
+
+        Optional<String> shardAllocation = checkShardLimit(settings, clusterState, deprecationLogger);
+        shardAllocation.ifPresent(validationErrors::add);
+
         if (validationErrors.isEmpty() == false) {
             ValidationException validationException = new ValidationException();
             validationException.addValidationErrors(validationErrors);
@@ -600,6 +606,21 @@ public class MetaDataCreateIndexService extends AbstractComponent {
         }
     }
 
+    /**
+     * Checks whether an index can be created without going over the cluster shard limit.
+     *
+     * @param settings The settings of the index to be created.
+     * @param clusterState The current cluster state.
+     * @param deprecationLogger The logger to use to emit a deprecation warning, if appropriate.
+     * @return If present, an error message to be used to reject index creation. If empty, a signal that this operation may be carried out.
+     */
+    static Optional<String> checkShardLimit(Settings settings, ClusterState clusterState, DeprecationLogger deprecationLogger) {
+        int shardsToCreate = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(settings)
+            * (1 + IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings));
+
+        return IndicesService.checkShardLimit(shardsToCreate, clusterState, deprecationLogger);
+    }
+
     List<String> getIndexSettingsValidationErrors(final Settings settings, final boolean forbidPrivateIndexSettings) {
         String customPath = IndexMetaData.INDEX_DATA_PATH_SETTING.get(settings);
         List<String> validationErrors = new ArrayList<>();

+ 34 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

@@ -36,8 +36,10 @@ import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.ValidationException;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.indices.IndicesService;
@@ -50,6 +52,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -175,6 +178,8 @@ public class MetaDataIndexStateService extends AbstractComponent {
                     }
                 }
 
+                validateShardLimit(currentState, request.indices(), deprecationLogger);
+
                 if (indicesToOpen.isEmpty()) {
                     return currentState;
                 }
@@ -217,4 +222,33 @@ public class MetaDataIndexStateService extends AbstractComponent {
         });
     }
 
+    /**
+     * Validates whether a list of indices can be opened without going over the cluster shard limit.  Only counts indices which are
+     * currently closed and will be opened, ignores indices which are already open.
+     *
+     * @param currentState The current cluster state.
+     * @param indices The indices which are to be opened.
+     * @param deprecationLogger The logger to use to emit a deprecation warning, if appropriate.
+     * @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled.
+     */
+    static void validateShardLimit(ClusterState currentState, Index[] indices, DeprecationLogger deprecationLogger) {
+        int shardsToOpen = Arrays.stream(indices)
+            .filter(index -> currentState.metaData().index(index).getState().equals(IndexMetaData.State.CLOSE))
+            .mapToInt(index -> getTotalShardCount(currentState, index))
+            .sum();
+
+        Optional<String> error = IndicesService.checkShardLimit(shardsToOpen, currentState, deprecationLogger);
+        if (error.isPresent()) {
+            ValidationException ex = new ValidationException();
+            ex.addValidationError(error.get());
+            throw ex;
+        }
+
+    }
+
+    private static int getTotalShardCount(ClusterState state, Index index) {
+        IndexMetaData indexMetaData = state.metaData().index(index);
+        return indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas());
+    }
+
 }

+ 24 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java

@@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.ValidationException;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.inject.Inject;
@@ -45,9 +46,11 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
@@ -115,6 +118,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent {
 
             @Override
             public ClusterState execute(ClusterState currentState) {
+
                 RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
                 MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData());
 
@@ -141,6 +145,18 @@ public class MetaDataUpdateSettingsService extends AbstractComponent {
 
                 int updatedNumberOfReplicas = openSettings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, -1);
                 if (updatedNumberOfReplicas != -1 && preserveExisting == false) {
+
+                    // Verify that this won't take us over the cluster shard limit.
+                    int totalNewShards = Arrays.stream(request.indices())
+                        .mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfReplicas))
+                        .sum();
+                    Optional<String> error = IndicesService.checkShardLimit(totalNewShards, currentState, deprecationLogger);
+                    if (error.isPresent()) {
+                        ValidationException ex = new ValidationException();
+                        ex.addValidationError(error.get());
+                        throw ex;
+                    }
+
                     // we do *not* update the in sync allocation ids as they will be removed upon the first index
                     // operation which make these copies stale
                     // TODO: update the list once the data is deleted by the node?
@@ -224,6 +240,14 @@ public class MetaDataUpdateSettingsService extends AbstractComponent {
         });
     }
 
+    private int getTotalNewShards(Index index, ClusterState currentState, int updatedNumberOfReplicas) {
+        IndexMetaData indexMetaData = currentState.metaData().index(index);
+        int shardsInIndex = indexMetaData.getNumberOfShards();
+        int oldNumberOfReplicas = indexMetaData.getNumberOfReplicas();
+        int replicaIncrease = updatedNumberOfReplicas - oldNumberOfReplicas;
+        return replicaIncrease * shardsInIndex;
+    }
+
     /**
      * Updates the cluster block only iff the setting exists in the given settings
      */

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -196,6 +196,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
                     MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
                     MetaData.SETTING_READ_ONLY_SETTING,
                     MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
+                    MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
                     RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
                     RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
                     RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,

+ 54 - 0
server/src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -38,6 +38,7 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.CheckedFunction;
@@ -52,6 +53,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
@@ -156,6 +158,21 @@ public class IndicesService extends AbstractLifecycleComponent
     public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout";
     public static final Setting<TimeValue> INDICES_CACHE_CLEAN_INTERVAL_SETTING =
         Setting.positiveTimeSetting("indices.cache.cleanup_interval", TimeValue.timeValueMinutes(1), Property.NodeScope);
+    private static final boolean ENFORCE_MAX_SHARDS_PER_NODE;
+
+    static {
+        final String ENFORCE_SHARD_LIMIT_KEY = "es.enforce_max_shards_per_node";
+        final String enforceMaxShardsPerNode = System.getProperty(ENFORCE_SHARD_LIMIT_KEY);
+        if (enforceMaxShardsPerNode == null) {
+            ENFORCE_MAX_SHARDS_PER_NODE = false;
+        } else if ("true".equals(enforceMaxShardsPerNode)) {
+            ENFORCE_MAX_SHARDS_PER_NODE = true;
+        } else {
+            throw new IllegalArgumentException(ENFORCE_SHARD_LIMIT_KEY + " may only be unset or set to [true] but was [" +
+                enforceMaxShardsPerNode + "]");
+        }
+    }
+
     private final PluginsService pluginsService;
     private final NodeEnvironment nodeEnv;
     private final NamedXContentRegistry xContentRegistry;
@@ -1352,4 +1369,41 @@ public class IndicesService extends AbstractLifecycleComponent
     public boolean isMetaDataField(String field) {
         return mapperRegistry.isMetaDataField(field);
     }
+
+    /**
+     * Checks to see if an operation can be performed without taking the cluster over the cluster-wide shard limit. Adds a deprecation
+     * warning or returns an error message as appropriate
+     *
+     * @param newShards         The number of shards to be added by this operation
+     * @param state             The current cluster state
+     * @param deprecationLogger The logger to use for deprecation warnings
+     * @return If present, an error message to be given as the reason for failing
+     * an operation. If empty, a sign that the operation is valid.
+     */
+    public static Optional<String> checkShardLimit(int newShards, ClusterState state, DeprecationLogger deprecationLogger) {
+        Settings theseSettings = state.metaData().settings();
+        int nodeCount = state.getNodes().getDataNodes().size();
+
+        // Only enforce the shard limit if we have at least one data node, so that we don't block
+        // index creation during cluster setup
+        if (nodeCount == 0 || newShards < 0) {
+            return Optional.empty();
+        }
+        int maxShardsPerNode = MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(theseSettings);
+        int maxShardsInCluster = maxShardsPerNode * nodeCount;
+        int currentOpenShards = state.getMetaData().getTotalOpenIndexShards();
+
+        if ((currentOpenShards + newShards) > maxShardsInCluster) {
+            String errorMessage = "this action would add [" + newShards + "] total shards, but this cluster currently has [" +
+                currentOpenShards + "]/[" + maxShardsInCluster + "] maximum shards open";
+            if (ENFORCE_MAX_SHARDS_PER_NODE) {
+                return Optional.of(errorMessage);
+            } else {
+                deprecationLogger.deprecated("In a future major version, this request will fail because {}. Before upgrading, " +
+                        "reduce the number of shards in your cluster or adjust the cluster setting [{}].",
+                    errorMessage, MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey());
+            }
+        }
+        return Optional.empty();
+    }
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -270,7 +270,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
                                 // Index doesn't exist - create it and start recovery
                                 // Make sure that the index we are about to create has a validate name
                                 MetaDataCreateIndexService.validateIndexName(renamedIndexName, currentState);
-                                createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), false);
+                                createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), currentState, false);
                                 IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData).state(IndexMetaData.State.OPEN).index(renamedIndexName);
                                 indexMdBuilder.settings(Settings.builder().put(snapshotIndexMetaData.getSettings()).put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()));
                                 if (!request.includeAliases() && !snapshotIndexMetaData.getAliases().isEmpty()) {

+ 32 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java

@@ -34,7 +34,9 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
 import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
+import org.elasticsearch.cluster.shards.ClusterShardLimitIT;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
@@ -56,7 +58,11 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.Collections.emptyMap;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
+import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount;
+import static org.elasticsearch.indices.IndicesServiceTests.createClusterForShardLimitTest;
 import static org.hamcrest.Matchers.endsWith;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -466,4 +472,30 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
             assertEquals("ratio is not a power of two", intRatio, Integer.highestOneBit(intRatio));
         }
     }
+
+    public void testShardLimitDeprecationWarning() {
+        int nodesInCluster = randomIntBetween(2,100);
+        ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster);
+        Settings clusterSettings = Settings.builder()
+            .put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), counts.getShardsPerNode())
+            .build();
+        ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(),
+            clusterSettings);
+
+        Settings indexSettings = Settings.builder()
+            .put(SETTING_VERSION_CREATED, Version.CURRENT)
+            .put(SETTING_NUMBER_OF_SHARDS, counts.getFailingIndexShards())
+            .put(SETTING_NUMBER_OF_REPLICAS, counts.getFailingIndexReplicas())
+            .build();
+
+        DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
+        MetaDataCreateIndexService.checkShardLimit(indexSettings, state, deprecationLogger);
+        int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
+        int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
+        int maxShards = counts.getShardsPerNode() * nodesInCluster;
+        assertWarnings("In a future major version, this request will fail because this action would add [" +
+            totalShards + "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open."+
+            " Before upgrading, reduce the number of shards in your cluster or adjust the cluster setting [cluster.max_shards_per_node].");
+    }
+
 }

+ 99 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java

@@ -0,0 +1,99 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.metadata;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.shards.ClusterShardLimitIT;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.logging.DeprecationLogger;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MetaDataIndexStateServiceTests extends ESTestCase {
+
+    public void testValidateShardLimitDeprecationWarning() {
+        int nodesInCluster = randomIntBetween(2,100);
+        ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster);
+        Settings clusterSettings = Settings.builder()
+            .put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), counts.getShardsPerNode())
+            .build();
+        ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(),
+            counts.getFailingIndexShards(), counts.getFailingIndexReplicas(), clusterSettings);
+
+        Index[] indices = Arrays.stream(state.metaData().indices().values().toArray(IndexMetaData.class))
+            .map(IndexMetaData::getIndex)
+            .collect(Collectors.toList())
+            .toArray(new Index[2]);
+
+        DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
+        MetaDataIndexStateService.validateShardLimit(state, indices, deprecationLogger);
+        int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
+        int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
+        int maxShards = counts.getShardsPerNode() * nodesInCluster;
+        assertWarnings("In a future major version, this request will fail because this action would add [" +
+            totalShards + "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open."+
+            " Before upgrading, reduce the number of shards in your cluster or adjust the cluster setting [cluster.max_shards_per_node].");
+    }
+
+    public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int openIndexShards, int openIndexReplicas,
+                                                              int closedIndexShards, int closedIndexReplicas, Settings clusterSettings) {
+        ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodes = ImmutableOpenMap.builder();
+        for (int i = 0; i < nodesInCluster; i++) {
+            dataNodes.put(randomAlphaOfLengthBetween(5,15), mock(DiscoveryNode.class));
+        }
+        DiscoveryNodes nodes = mock(DiscoveryNodes.class);
+        when(nodes.getDataNodes()).thenReturn(dataNodes.build());
+
+        IndexMetaData.Builder openIndexMetaData = IndexMetaData.builder(randomAlphaOfLengthBetween(5, 15))
+            .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
+            .creationDate(randomLong())
+            .numberOfShards(openIndexShards)
+            .numberOfReplicas(openIndexReplicas);
+        IndexMetaData.Builder closedIndexMetaData = IndexMetaData.builder(randomAlphaOfLengthBetween(5, 15))
+            .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
+            .creationDate(randomLong())
+            .state(IndexMetaData.State.CLOSE)
+            .numberOfShards(closedIndexShards)
+            .numberOfReplicas(closedIndexReplicas);
+        MetaData.Builder metaData = MetaData.builder().put(openIndexMetaData).put(closedIndexMetaData);
+        if (randomBoolean()) {
+            metaData.persistentSettings(clusterSettings);
+        } else {
+            metaData.transientSettings(clusterSettings);
+        }
+
+        return ClusterState.builder(ClusterName.DEFAULT)
+            .metaData(metaData)
+            .nodes(nodes)
+            .build();
+    }
+}

+ 140 - 0
server/src/test/java/org/elasticsearch/cluster/shards/ClusterShardLimitIT.java

@@ -0,0 +1,140 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.elasticsearch.cluster.shards;
+
+import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESIntegTestCase;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
+public class ClusterShardLimitIT extends ESIntegTestCase {
+    private static final String shardsPerNodeKey = MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey();
+
+    public void testSettingClusterMaxShards() {
+        int shardsPerNode = between(1, 500_000);
+        setShardsPerNode(shardsPerNode);
+    }
+
+    public void testMinimumPerNode() {
+        int negativeShardsPerNode = between(-50_000, 0);
+        try {
+            if (frequently()) {
+                client().admin().cluster()
+                    .prepareUpdateSettings()
+                    .setPersistentSettings(Settings.builder().put(shardsPerNodeKey, negativeShardsPerNode).build())
+                    .get();
+            } else {
+                client().admin().cluster()
+                    .prepareUpdateSettings()
+                    .setTransientSettings(Settings.builder().put(shardsPerNodeKey, negativeShardsPerNode).build())
+                    .get();
+            }
+            fail("should not be able to set negative shards per node");
+        } catch (IllegalArgumentException ex) {
+            assertEquals("Failed to parse value [" + negativeShardsPerNode + "] for setting [cluster.max_shards_per_node] must be >= 1",
+                ex.getMessage());
+        }
+    }
+
+    private void setShardsPerNode(int shardsPerNode) {
+        try {
+            ClusterUpdateSettingsResponse response;
+            if (frequently()) {
+                response = client().admin().cluster()
+                    .prepareUpdateSettings()
+                    .setPersistentSettings(Settings.builder().put(shardsPerNodeKey, shardsPerNode).build())
+                    .get();
+                assertEquals(shardsPerNode, response.getPersistentSettings().getAsInt(shardsPerNodeKey, -1).intValue());
+            } else {
+                response = client().admin().cluster()
+                    .prepareUpdateSettings()
+                    .setTransientSettings(Settings.builder().put(shardsPerNodeKey, shardsPerNode).build())
+                    .get();
+                assertEquals(shardsPerNode, response.getTransientSettings().getAsInt(shardsPerNodeKey, -1).intValue());
+            }
+        } catch (IllegalArgumentException ex) {
+            fail(ex.getMessage());
+        }
+    }
+
+    public static class ShardCounts {
+        private final int shardsPerNode;
+
+        private final int firstIndexShards;
+        private final int firstIndexReplicas;
+
+        private final int failingIndexShards;
+        private final int failingIndexReplicas;
+
+        private ShardCounts(int shardsPerNode,
+                            int firstIndexShards,
+                            int firstIndexReplicas,
+                            int failingIndexShards,
+                            int failingIndexReplicas) {
+            this.shardsPerNode = shardsPerNode;
+            this.firstIndexShards = firstIndexShards;
+            this.firstIndexReplicas = firstIndexReplicas;
+            this.failingIndexShards = failingIndexShards;
+            this.failingIndexReplicas = failingIndexReplicas;
+        }
+
+        public static ShardCounts forDataNodeCount(int dataNodes) {
+            int mainIndexReplicas = between(0, dataNodes - 1);
+            int mainIndexShards = between(1, 10);
+            int totalShardsInIndex = (mainIndexReplicas + 1) * mainIndexShards;
+            int shardsPerNode = (int) Math.ceil((double) totalShardsInIndex / dataNodes);
+            int totalCap = shardsPerNode * dataNodes;
+
+            int failingIndexShards;
+            int failingIndexReplicas;
+            if (dataNodes > 1 && frequently()) {
+                failingIndexShards = Math.max(1, totalCap - totalShardsInIndex);
+                failingIndexReplicas = between(1, dataNodes - 1);
+            } else {
+                failingIndexShards = totalCap - totalShardsInIndex + between(1, 10);
+                failingIndexReplicas = 0;
+            }
+
+            return new ShardCounts(shardsPerNode, mainIndexShards, mainIndexReplicas, failingIndexShards, failingIndexReplicas);
+        }
+
+        public int getShardsPerNode() {
+            return shardsPerNode;
+        }
+
+        public int getFirstIndexShards() {
+            return firstIndexShards;
+        }
+
+        public int getFirstIndexReplicas() {
+            return firstIndexReplicas;
+        }
+
+        public int getFailingIndexShards() {
+            return failingIndexShards;
+        }
+
+        public int getFailingIndexReplicas() {
+            return failingIndexReplicas;
+        }
+    }
+}

+ 78 - 0
server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java

@@ -29,9 +29,14 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexGraveyard;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.cluster.shards.ClusterShardLimitIT;
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.io.FileSystemUtils;
+import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
@@ -80,6 +85,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
+import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.hamcrest.Matchers.containsString;
@@ -567,4 +573,76 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
         assertThat(e, hasToString(new RegexMatcher(pattern)));
     }
 
+    public void testOverShardLimit() {
+        int nodesInCluster = randomIntBetween(1,100);
+        ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster);
+
+        Settings clusterSettings = Settings.builder()
+            .put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), counts.getShardsPerNode())
+            .build();
+
+        ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(),
+            clusterSettings);
+
+        int shardsToAdd = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
+        DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
+        Optional<String> errorMessage = IndicesService.checkShardLimit(shardsToAdd, state, deprecationLogger);
+
+        int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
+        int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
+        int maxShards = counts.getShardsPerNode() * nodesInCluster;
+        assertWarnings("In a future major version, this request will fail because this action would add [" +
+            totalShards + "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open."+
+            " Before upgrading, reduce the number of shards in your cluster or adjust the cluster setting [cluster.max_shards_per_node].");
+        assertFalse(errorMessage.isPresent());
+    }
+
+    public void testUnderShardLimit() {
+        int nodesInCluster = randomIntBetween(2,100);
+        // Calculate the counts for a cluster 1 node smaller than we have to ensure we have headroom
+        ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster - 1);
+
+        Settings clusterSettings = Settings.builder()
+            .put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), counts.getShardsPerNode())
+            .build();
+
+        ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(),
+            clusterSettings);
+
+        int existingShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
+        int shardsToAdd = randomIntBetween(1, (counts.getShardsPerNode() * nodesInCluster) - existingShards);
+        DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
+        Optional<String> errorMessage = IndicesService.checkShardLimit(shardsToAdd, state, deprecationLogger);
+
+        assertFalse(errorMessage.isPresent());
+    }
+
+    public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int shardsInIndex, int replicas,
+                                                              Settings clusterSettings) {
+        ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodes = ImmutableOpenMap.builder();
+        for (int i = 0; i < nodesInCluster; i++) {
+            dataNodes.put(randomAlphaOfLengthBetween(5,15), mock(DiscoveryNode.class));
+        }
+        DiscoveryNodes nodes = mock(DiscoveryNodes.class);
+        when(nodes.getDataNodes()).thenReturn(dataNodes.build());
+
+        IndexMetaData.Builder indexMetaData = IndexMetaData.builder(randomAlphaOfLengthBetween(5, 15))
+            .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
+            .creationDate(randomLong())
+            .numberOfShards(shardsInIndex)
+            .numberOfReplicas(replicas);
+        MetaData.Builder metaData = MetaData.builder().put(indexMetaData);
+        if (randomBoolean()) {
+            metaData.transientSettings(clusterSettings);
+        } else {
+            metaData.persistentSettings(clusterSettings);
+        }
+
+        return ClusterState.builder(ClusterName.DEFAULT)
+            .metaData(metaData)
+            .nodes(nodes)
+            .build();
+    }
+
+
 }

+ 1 - 0
server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java

@@ -143,6 +143,7 @@ public class ClusterStateChanges extends AbstractComponent {
 
         // mocks
         clusterService = mock(ClusterService.class);
+        when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
         IndicesService indicesService = mock(IndicesService.class);
         // MetaDataCreateIndexService creates indices using its IndicesService instance to check mappings -> fake it here
         try {

+ 30 - 0
x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/ClusterDeprecationChecks.java

@@ -0,0 +1,30 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.deprecation;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
+
+public class ClusterDeprecationChecks {
+
+    static DeprecationIssue checkShardLimit(ClusterState state) {
+        int shardsPerNode = MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(state.metaData().settings());
+        int nodeCount = state.getNodes().getDataNodes().size();
+        int maxShardsInCluster = shardsPerNode * nodeCount;
+        int currentOpenShards = state.getMetaData().getTotalOpenIndexShards();
+
+        if (currentOpenShards >= maxShardsInCluster) {
+            return new DeprecationIssue(DeprecationIssue.Level.WARNING,
+                "Number of open shards exceeds cluster soft limit",
+                "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking_70_cluster_changes.html",
+                "There are [" + currentOpenShards + "] open shards in this cluster, but the cluster is limited to [" +
+                    shardsPerNode + "] per data node, for [" + maxShardsInCluster + "] maximum.");
+        }
+        return null;
+    }
+}

+ 1 - 1
x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DeprecationChecks.java

@@ -29,7 +29,7 @@ public class DeprecationChecks {
 
     static List<Function<ClusterState, DeprecationIssue>> CLUSTER_SETTINGS_CHECKS =
         Collections.unmodifiableList(Arrays.asList(
-            // STUB
+            ClusterDeprecationChecks::checkShardLimit
         ));
 
     static List<BiFunction<List<NodeInfo>, List<NodeStats>, DeprecationIssue>> NODE_SETTINGS_CHECKS =