Browse Source

Auto-expand indices according to allocation filtering rules (#48974)

Honours allocation filtering rules when auto-expanding indices.
Yannick Welsch 6 years ago
parent
commit
18c2aab576

+ 6 - 5
docs/reference/index-modules.asciidoc

@@ -96,11 +96,12 @@ specific index module:
     Auto-expand the number of replicas based on the number of data nodes in the cluster.
     Set to a dash delimited lower and upper bound (e.g. `0-5`) or use `all`
     for the upper bound (e.g. `0-all`). Defaults to `false` (i.e. disabled).
-    Note that the auto-expanded number of replicas does not take any other allocation
-    rules into account, such as <<allocation-awareness,shard allocation awareness>>,
-    <<shard-allocation-filtering,filtering>> or <<allocation-total-shards,total shards per node>>,
-    and this can lead to the cluster health becoming `YELLOW` if the applicable rules
-    prevent all the replicas from being allocated.
+    Note that the auto-expanded number of replicas only takes
+    <<shard-allocation-filtering,allocation filtering>> rules into account, but ignores
+    any other allocation rules such as <<allocation-awareness,shard allocation awareness>>
+    and <<allocation-total-shards,total shards per node>>, and this can lead to the
+    cluster health becoming `YELLOW` if the applicable rules prevent all the replicas
+    from being allocated.
 
 `index.search.idle.after`::
     How long a shard can not receive a search or get request until it's considered

+ 17 - 9
server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java

@@ -18,7 +18,10 @@
  */
 package org.elasticsearch.cluster.metadata;
 
-import org.elasticsearch.cluster.node.DiscoveryNodes;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.cluster.routing.allocation.decider.Decision;
 import org.elasticsearch.common.Booleans;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
@@ -99,11 +102,19 @@ public final class AutoExpandReplicas {
         return Math.min(maxReplicas, numDataNodes-1);
     }
 
-    private OptionalInt getDesiredNumberOfReplicas(int numDataNodes) {
+    private OptionalInt getDesiredNumberOfReplicas(IndexMetaData indexMetaData, RoutingAllocation allocation) {
         if (enabled) {
+            int numMatchingDataNodes = 0;
+            for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().getDataNodes().values()) {
+                Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation);
+                if (decision.type() != Decision.Type.NO) {
+                    numMatchingDataNodes ++;
+                }
+            }
+
             final int min = getMinReplicas();
-            final int max = getMaxReplicas(numDataNodes);
-            int numberOfReplicas = numDataNodes - 1;
+            final int max = getMaxReplicas(numMatchingDataNodes);
+            int numberOfReplicas = numMatchingDataNodes - 1;
             if (numberOfReplicas < min) {
                 numberOfReplicas = min;
             } else if (numberOfReplicas > max) {
@@ -128,16 +139,13 @@ public final class AutoExpandReplicas {
      * The map has the desired number of replicas as key and the indices to update as value, as this allows the result
      * of this method to be directly applied to RoutingTable.Builder#updateNumberOfReplicas.
      */
-    public static Map<Integer, List<String>> getAutoExpandReplicaChanges(MetaData metaData, DiscoveryNodes discoveryNodes) {
-        // used for translating "all" to a number
-        final int dataNodeCount = discoveryNodes.getDataNodes().size();
-
+    public static Map<Integer, List<String>> getAutoExpandReplicaChanges(MetaData metaData, RoutingAllocation allocation) {
         Map<Integer, List<String>> nrReplicasChanged = new HashMap<>();
 
         for (final IndexMetaData indexMetaData : metaData) {
             if (indexMetaData.getState() == IndexMetaData.State.OPEN || isIndexVerifiedBeforeClosed(indexMetaData)) {
                 AutoExpandReplicas autoExpandReplicas = SETTING.get(indexMetaData.getSettings());
-                autoExpandReplicas.getDesiredNumberOfReplicas(dataNodeCount).ifPresent(numberOfReplicas -> {
+                autoExpandReplicas.getDesiredNumberOfReplicas(indexMetaData, allocation).ifPresent(numberOfReplicas -> {
                     if (numberOfReplicas != indexMetaData.getNumberOfReplicas()) {
                         nrReplicasChanged.computeIfAbsent(numberOfReplicas, ArrayList::new).add(indexMetaData.getIndex().getName());
                     }

+ 5 - 3
server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

@@ -254,8 +254,10 @@ public class AllocationService {
      * Returns an updated cluster state if changes were necessary, or the identical cluster if no changes were required.
      */
     public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
+        RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, clusterState.getRoutingNodes(), clusterState,
+            clusterInfoService.getClusterInfo(), currentNanoTime());
         final Map<Integer, List<String>> autoExpandReplicaChanges =
-            AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), clusterState.nodes());
+            AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), allocation);
         if (autoExpandReplicaChanges.isEmpty()) {
             return clusterState;
         } else {
@@ -279,7 +281,7 @@ public class AllocationService {
             }
             final ClusterState fixedState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build())
                 .metaData(metaDataBuilder).build();
-            assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metaData(), fixedState.nodes()).isEmpty();
+            assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metaData(), allocation).isEmpty();
             return fixedState;
         }
     }
@@ -408,7 +410,7 @@ public class AllocationService {
 
     private void reroute(RoutingAllocation allocation) {
         assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
-        assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
+        assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation).isEmpty() :
             "auto-expand replicas out of sync with number of nodes in the cluster";
 
         removeDelayMarkers(allocation);

+ 9 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.cluster.routing.allocation.decider;
 
 import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@@ -80,6 +81,14 @@ public abstract class AllocationDecider {
         return Decision.ALWAYS;
     }
 
+    /**
+     * Returns a {@link Decision} whether shards of the given index should be auto-expanded to this node at this state of the
+     * {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}.
+     */
+    public Decision shouldAutoExpandToNode(IndexMetaData indexMetaData, DiscoveryNode node, RoutingAllocation allocation) {
+        return Decision.ALWAYS;
+    }
+
     /**
      * Returns a {@link Decision} whether the cluster can execute
      * re-balanced operations at all.

+ 21 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java

@@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation.decider;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@@ -145,6 +146,26 @@ public class AllocationDeciders extends AllocationDecider {
         return ret;
     }
 
+    @Override
+    public Decision shouldAutoExpandToNode(IndexMetaData indexMetaData, DiscoveryNode node, RoutingAllocation allocation) {
+        Decision.Multi ret = new Decision.Multi();
+        for (AllocationDecider allocationDecider : allocations) {
+            Decision decision = allocationDecider.shouldAutoExpandToNode(indexMetaData, node, allocation);
+            // short track if a NO is returned.
+            if (decision == Decision.NO) {
+                if (!allocation.debugDecision()) {
+                    return decision;
+                } else {
+                    ret.add(decision);
+                }
+            } else if (decision != Decision.ALWAYS
+                && (allocation.getDebugMode() != EXCLUDE_YES_DECISIONS || decision.type() != Decision.Type.YES)) {
+                ret.add(decision);
+            }
+        }
+        return ret;
+    }
+
     @Override
     public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
         Decision.Multi ret = new Decision.Multi();

+ 25 - 13
server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.cluster.routing.allocation.decider;
 
 import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.RoutingNode;
@@ -109,20 +110,31 @@ public class FilterAllocationDecider extends AllocationDecider {
                 return allocation.decision(Decision.NO, NAME, explanation, initialRecoveryFilters);
             }
         }
-        return shouldFilter(shardRouting, node, allocation);
+        return shouldFilter(shardRouting, node.node(), allocation);
     }
 
     @Override
     public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, RoutingAllocation allocation) {
-        return shouldFilter(indexMetaData, node, allocation);
+        return shouldFilter(indexMetaData, node.node(), allocation);
     }
 
     @Override
     public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
-        return shouldFilter(shardRouting, node, allocation);
+        return shouldFilter(shardRouting, node.node(), allocation);
     }
 
-    private Decision shouldFilter(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
+    @Override
+    public Decision shouldAutoExpandToNode(IndexMetaData indexMetaData, DiscoveryNode node, RoutingAllocation allocation) {
+        Decision decision = shouldClusterFilter(node, allocation);
+        if (decision != null) return decision;
+
+        decision = shouldIndexFilter(indexMetaData, node, allocation);
+        if (decision != null) return decision;
+
+        return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
+    }
+
+    private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
         Decision decision = shouldClusterFilter(node, allocation);
         if (decision != null) return decision;
 
@@ -132,7 +144,7 @@ public class FilterAllocationDecider extends AllocationDecider {
         return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
     }
 
-    private Decision shouldFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
+    private Decision shouldFilter(IndexMetaData indexMd, DiscoveryNode node, RoutingAllocation allocation) {
         Decision decision = shouldClusterFilter(node, allocation);
         if (decision != null) return decision;
 
@@ -142,21 +154,21 @@ public class FilterAllocationDecider extends AllocationDecider {
         return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
     }
 
-    private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
+    private Decision shouldIndexFilter(IndexMetaData indexMd, DiscoveryNode node, RoutingAllocation allocation) {
         if (indexMd.requireFilters() != null) {
-            if (indexMd.requireFilters().match(node.node()) == false) {
+            if (indexMd.requireFilters().match(node) == false) {
                 return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
                     IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_PREFIX, indexMd.requireFilters());
             }
         }
         if (indexMd.includeFilters() != null) {
-            if (indexMd.includeFilters().match(node.node()) == false) {
+            if (indexMd.includeFilters().match(node) == false) {
                 return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
                     IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_PREFIX, indexMd.includeFilters());
             }
         }
         if (indexMd.excludeFilters() != null) {
-            if (indexMd.excludeFilters().match(node.node())) {
+            if (indexMd.excludeFilters().match(node)) {
                 return allocation.decision(Decision.NO, NAME, "node matches index setting [%s] filters [%s]",
                     IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), indexMd.excludeFilters());
             }
@@ -164,21 +176,21 @@ public class FilterAllocationDecider extends AllocationDecider {
         return null;
     }
 
-    private Decision shouldClusterFilter(RoutingNode node, RoutingAllocation allocation) {
+    private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) {
         if (clusterRequireFilters != null) {
-            if (clusterRequireFilters.match(node.node()) == false) {
+            if (clusterRequireFilters.match(node) == false) {
                 return allocation.decision(Decision.NO, NAME, "node does not match cluster setting [%s] filters [%s]",
                     CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX, clusterRequireFilters);
             }
         }
         if (clusterIncludeFilters != null) {
-            if (clusterIncludeFilters.match(node.node()) == false) {
+            if (clusterIncludeFilters.match(node) == false) {
                 return allocation.decision(Decision.NO, NAME, "node does not cluster setting [%s] filters [%s]",
                     CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX, clusterIncludeFilters);
             }
         }
         if (clusterExcludeFilters != null) {
-            if (clusterExcludeFilters.match(node.node())) {
+            if (clusterExcludeFilters.match(node)) {
                 return allocation.decision(Decision.NO, NAME, "node matches cluster setting [%s] filters [%s]",
                     CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX, clusterExcludeFilters);
             }

+ 40 - 0
server/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java

@@ -21,6 +21,7 @@ package org.elasticsearch.cluster.allocation;
 
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@@ -97,6 +98,45 @@ public class FilteringAllocationIT extends ESIntegTestCase {
             .execute().actionGet().getHits().getTotalHits().value, equalTo(100L));
     }
 
+    public void testAutoExpandReplicasToFilteredNodes() {
+        logger.info("--> starting 2 nodes");
+        List<String> nodesIds = internalCluster().startNodes(2);
+        final String node_0 = nodesIds.get(0);
+        final String node_1 = nodesIds.get(1);
+        assertThat(cluster().size(), equalTo(2));
+
+        logger.info("--> creating an index with auto-expand replicas");
+        createIndex("test", Settings.builder()
+            .put(AutoExpandReplicas.SETTING.getKey(), "0-all")
+            .build());
+        ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
+        assertThat(clusterState.metaData().index("test").getNumberOfReplicas(), equalTo(1));
+        ensureGreen("test");
+
+        logger.info("--> filter out the second node");
+        if (randomBoolean()) {
+            client().admin().cluster().prepareUpdateSettings()
+                .setTransientSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", node_1))
+                .execute().actionGet();
+        } else {
+            client().admin().indices().prepareUpdateSettings("test")
+                .setSettings(Settings.builder().put("index.routing.allocation.exclude._name", node_1))
+                .execute().actionGet();
+        }
+        ensureGreen("test");
+
+        logger.info("--> verify all are allocated on node1 now");
+        clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
+        assertThat(clusterState.metaData().index("test").getNumberOfReplicas(), equalTo(0));
+        for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
+            for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
+                for (ShardRouting shardRouting : indexShardRoutingTable) {
+                    assertThat(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(node_0));
+                }
+            }
+        }
+    }
+
     public void testDisablingAllocationFiltering() {
         logger.info("--> starting 2 nodes");
         List<String> nodesIds = internalCluster().startNodes(2);