Browse Source

[ROUTING] Add rebalance enabled allocation decider

This commit adds the ability to enable / disable relocations
on an entire cluster or on individual indices for either:

 * `primaries` - only primaries can rebalance
 * `replica` - only replicas can rebalance
 * `all` - everything can rebalance (default)
 * `none` - all rebalances are disabled

similar to the allocation enable / disable functionality.

Relates to #7288
Simon Willnauer 11 years ago
parent
commit
d5c0a49620

+ 7 - 0
docs/reference/indices/update-settings.asciidoc

@@ -118,6 +118,13 @@ settings API:
     * `new_primaries` - Allows shard allocation only for primary shards for new indices.
     * `none` - No shard allocation is allowed.
 
+`index.routing.rebalance.enable`::
+    Enables shard rebalancing for a specific index. It can be set to:
+    * `all` (default) - Allows shard rebalancing for all shards.
+    * `primaries` - Allows shard rebalancing only for primary shards.
+    * `replicas` - Allows shard rebalancing only for replica shards.
+    * `none` - No shard rebalancing is allowed.
+
 `index.routing.allocation.total_shards_per_node`::
     Controls the total number of shards (replicas and primaries) allowed to be allocated on a single node. Defaults to unbounded (`-1`).
 

+ 11 - 0
docs/reference/modules/cluster.asciidoc

@@ -46,6 +46,17 @@ Can be set to:
     * `new_primaries` - Allows shard allocation only for primary shards for new indices.
     * `none` - No shard allocations of any kind are allowed for all indices.
 
+`cluster.routing.rebalance.enable`::
+
+Controls shard rebalance for all indices, by allowing specific
+kinds of shard to be rebalanced.
+
+Can be set to:
+    * `all` (default) - Allows shard balancing for all kinds of shards.
+    * `primaries` - Allows shard balancing only for primary shards.
+    * `replicas` - Allows shard balancing only for replica shards.
+    * `none` - No shard balancing of any kind are allowed for all indices.
+
 `cluster.routing.allocation.same_shard.host`::
       Allows to perform a check to prevent allocation of multiple instances
       of the same shard on a single host, based on host name and host address.

+ 112 - 17
src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java

@@ -31,17 +31,32 @@ import org.elasticsearch.node.settings.NodeSettingsService;
 import java.util.Locale;
 
 /**
- * This allocation decider allows shard allocations via the cluster wide settings {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE}
- * and the per index setting {@link #INDEX_ROUTING_ALLOCATION_ENABLE}. The per index settings overrides the cluster wide
- * setting. Depending on the
+ * This allocation decider allows shard allocations / rebalancing via the cluster wide settings {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE} /
+ * {@link #CLUSTER_ROUTING_REBALANCE_ENABLE} and the per index setting {@link #INDEX_ROUTING_ALLOCATION_ENABLE} / {@link #INDEX_ROUTING_REBALANCE_ENABLE}.
+ * The per index settings overrides the cluster wide setting.
  *
- * Both settings can have the following values:
+ * <p>
+ * Allocation settings can have the following values (non-casesensitive):
  * <ul>
- *     <li> <code>NONE</code>, no shard allocation is allowed.
- *     <li> <code>NEW_PRIMARIES</code> only primary shards of new indices are allowed to be allocated
- *     <li> <code>PRIMARIES</code> only primary shards (of any index) are allowed to be allocated
- *     <li> <code>ALL</code> all shards are allowed to be allocated
+ *     <li> <code>NONE</code> - no shard allocation is allowed.
+ *     <li> <code>NEW_PRIMARIES</code> - only primary shards of new indices are allowed to be allocated
+ *     <li> <code>PRIMARIES</code> - only primary shards are allowed to be allocated
+ *     <li> <code>ALL</code> - all shards are allowed to be allocated
  * </ul>
+ * </p>
+ *
+ * <p>
+ * Rebalancing settings can have the following values (non-casesensitive):
+ * <ul>
+ *     <li> <code>NONE</code> - no shard rebalancing is allowed.
+ *     <li> <code>REPLICAS</code> - only replica shards are allowed to be balanced
+ *     <li> <code>PRIMARIES</code> - only primary shards are allowed to be balanced
+ *     <li> <code>ALL</code> - all shards are allowed to be balanced
+ * </ul>
+ * </p>
+ *
+ * @see Rebalance
+ * @see Allocation
  */
 public class EnableAllocationDecider extends AllocationDecider implements NodeSettingsService.Listener {
 
@@ -50,12 +65,18 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
     public static final String CLUSTER_ROUTING_ALLOCATION_ENABLE = "cluster.routing.allocation.enable";
     public static final String INDEX_ROUTING_ALLOCATION_ENABLE = "index.routing.allocation.enable";
 
-    private volatile Allocation enable;
+    public static final String CLUSTER_ROUTING_REBALANCE_ENABLE = "cluster.routing.rebalance.enable";
+    public static final String INDEX_ROUTING_REBALANCE_ENABLE = "index.routing.rebalance.enable";
+
+    private volatile Rebalance enableRebalance;
+    private volatile Allocation enableAllocation;
+
 
     @Inject
     public EnableAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) {
         super(settings);
-        this.enable = Allocation.parse(settings.get(CLUSTER_ROUTING_ALLOCATION_ENABLE, Allocation.ALL.name()));
+        this.enableAllocation = Allocation.parse(settings.get(CLUSTER_ROUTING_ALLOCATION_ENABLE, Allocation.ALL.name()));
+        this.enableRebalance = Rebalance.parse(settings.get(CLUSTER_ROUTING_REBALANCE_ENABLE, Rebalance.ALL.name()));
         nodeSettingsService.addListener(this);
     }
 
@@ -71,7 +92,7 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
         if (enableIndexValue != null) {
             enable = Allocation.parse(enableIndexValue);
         } else {
-            enable = this.enable;
+            enable = this.enableAllocation;
         }
         switch (enable) {
             case ALL:
@@ -82,28 +103,76 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
                 if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
                     return allocation.decision(Decision.YES, NAME, "new primary allocations are allowed");
                 } else {
-                    return allocation.decision(Decision.NO, NAME, "non-new primary allocations are disallowed");
+                    return allocation.decision(Decision.NO, NAME, "non-new primary allocations are forbidden");
                 }
             case PRIMARIES:
                 if (shardRouting.primary()) {
                     return allocation.decision(Decision.YES, NAME, "primary allocations are allowed");
                 } else {
-                    return allocation.decision(Decision.NO, NAME, "replica allocations are disallowed");
+                    return allocation.decision(Decision.NO, NAME, "replica allocations are forbidden");
                 }
             default:
                 throw new ElasticsearchIllegalStateException("Unknown allocation option");
         }
     }
 
+    @Override
+    public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
+        if (allocation.ignoreDisable()) {
+            return allocation.decision(Decision.YES, NAME, "rebalance disabling is ignored");
+        }
+
+        Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).settings();
+        String enableIndexValue = indexSettings.get(INDEX_ROUTING_REBALANCE_ENABLE);
+        final Rebalance enable;
+        if (enableIndexValue != null) {
+            enable = Rebalance.parse(enableIndexValue);
+        } else {
+            enable = this.enableRebalance;
+        }
+        switch (enable) {
+            case ALL:
+                return allocation.decision(Decision.YES, NAME, "all rebalancing is allowed");
+            case NONE:
+                return allocation.decision(Decision.NO, NAME, "no rebalancing is allowed");
+            case PRIMARIES:
+                if (shardRouting.primary()) {
+                    return allocation.decision(Decision.YES, NAME, "primary rebalancing is allowed");
+                } else {
+                    return allocation.decision(Decision.NO, NAME, "replica rebalancing is forbidden");
+                }
+            case REPLICAS:
+                if (shardRouting.primary() == false) {
+                    return allocation.decision(Decision.YES, NAME, "replica rebalancing is allowed");
+                } else {
+                    return allocation.decision(Decision.NO, NAME, "primary rebalancing is forbidden");
+                }
+            default:
+                throw new ElasticsearchIllegalStateException("Unknown rebalance option");
+        }
+    }
+
     @Override
     public void onRefreshSettings(Settings settings) {
-        Allocation enable = Allocation.parse(settings.get(CLUSTER_ROUTING_ALLOCATION_ENABLE, this.enable.name()));
-        if (enable != this.enable) {
-            logger.info("updating [cluster.routing.allocation.enable] from [{}] to [{}]", this.enable, enable);
-            EnableAllocationDecider.this.enable = enable;
+        final Allocation enable = Allocation.parse(settings.get(CLUSTER_ROUTING_ALLOCATION_ENABLE, this.enableAllocation.name()));
+        if (enable != this.enableAllocation) {
+            logger.info("updating [{}] from [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_ENABLE, this.enableAllocation, enable);
+            EnableAllocationDecider.this.enableAllocation = enable;
+        }
+
+        final Rebalance enableRebalance = Rebalance.parse(settings.get(CLUSTER_ROUTING_REBALANCE_ENABLE, this.enableRebalance.name()));
+        if (enableRebalance != this.enableRebalance) {
+            logger.info("updating [{}] from [{}] to [{}]", CLUSTER_ROUTING_REBALANCE_ENABLE, this.enableRebalance, enableRebalance);
+            EnableAllocationDecider.this.enableRebalance = enableRebalance;
         }
+
     }
 
+    /**
+     * Allocation values or rather their string representation to be used used with
+     * {@link EnableAllocationDecider#CLUSTER_ROUTING_ALLOCATION_ENABLE} / {@link EnableAllocationDecider#INDEX_ROUTING_ALLOCATION_ENABLE}
+     * via cluster / index settings.
+     */
     public enum Allocation {
 
         NONE,
@@ -125,4 +194,30 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
         }
     }
 
+    /**
+     * Rebalance values or rather their string representation to be used used with
+     * {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE} / {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE}
+     * via cluster / index settings.
+     */
+    public enum Rebalance {
+
+        NONE,
+        PRIMARIES,
+        REPLICAS,
+        ALL;
+
+        public static Rebalance parse(String strValue) {
+            if (strValue == null) {
+                return null;
+            } else {
+                strValue = strValue.toUpperCase(Locale.ROOT);
+                try {
+                    return Rebalance.valueOf(strValue);
+                } catch (IllegalArgumentException e) {
+                    throw new ElasticsearchIllegalArgumentException("Illegal rebalance.enable value [" + strValue + "]");
+                }
+            }
+        }
+    }
+
 }

+ 1 - 0
src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java

@@ -54,6 +54,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
                 ClusterRebalanceAllocationDecider.ALLOCATION_ALLOW_REBALANCE_VALIDATOR);
         clusterDynamicSettings.addDynamicSetting(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE, Validator.INTEGER);
         clusterDynamicSettings.addDynamicSetting(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE);
+        clusterDynamicSettings.addDynamicSetting(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE);
         clusterDynamicSettings.addDynamicSetting(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION);
         clusterDynamicSettings.addDynamicSetting(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION);
         clusterDynamicSettings.addDynamicSetting(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION);

+ 1 - 0
src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java

@@ -60,6 +60,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
         indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "*");
         indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "*");
         indexDynamicSettings.addDynamicSetting(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE);
+        indexDynamicSettings.addDynamicSetting(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE);
         indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_ALLOCATION);
         indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION);
         indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION);

+ 0 - 14
src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java

@@ -248,20 +248,6 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
         }
     }
 
-    public void assertAllShardsOnNodes(String index, String pattern) {
-        ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
-        for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
-            for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
-                for (ShardRouting shardRouting : indexShardRoutingTable) {
-                    if (shardRouting.currentNodeId() != null &&  index.equals(shardRouting.getIndex())) {
-                        String name = clusterState.nodes().get(shardRouting.currentNodeId()).name();
-                        assertThat("Allocated on new node: " + name, Regex.simpleMatch(pattern, name), is(true));
-                    }
-                }
-            }
-        }
-    }
-
     /**
      * Upgrades a single node to the current version
      */

+ 79 - 0
src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDeciderIntegrationTest.java

@@ -0,0 +1,79 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.routing.allocation.decider;
+
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+
+import java.util.Set;
+
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ * Simple integration for {@link EnableAllocationDecider} there is a more exhaustive unittest in
+ * {@link EnableAllocationTests} this test is meant to check if the actual update of the settings
+ * works as expected.
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0)
+public class EnableAllocationDeciderIntegrationTest extends ElasticsearchIntegrationTest {
+
+    public void testEnableRebalance() throws InterruptedException {
+        final String firstNode = internalCluster().startNode();
+        client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)).get();
+        // we test with 2 shards since otherwise it's pretty fragile if there are difference in the num or shards such that
+        // all shards are relocated to the second node which is not what we want here. It's solely a test for the settings to take effect
+        final int numShards = 2;
+        assertAcked(prepareCreate("test").setSettings(settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)));
+        assertAcked(prepareCreate("test_1").setSettings(settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)));
+        ensureGreen();
+        assertAllShardsOnNodes("test", firstNode);
+        assertAllShardsOnNodes("test_1", firstNode);
+
+        final String secondNode = internalCluster().startNode();
+        // prevent via index setting but only on index test
+        client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)).get();
+        client().admin().cluster().prepareReroute().get();
+        ensureGreen();
+        assertAllShardsOnNodes("test", firstNode);
+        assertAllShardsOnNodes("test_1", firstNode);
+
+        // now enable the index test to relocate since index settings override cluster settings
+        client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, randomBoolean() ? EnableAllocationDecider.Rebalance.PRIMARIES : EnableAllocationDecider.Rebalance.ALL)).get();
+        logger.info("--> balance index [test]");
+        client().admin().cluster().prepareReroute().get();
+        ensureGreen("test");
+        Set<String> test = assertAllShardsOnNodes("test", firstNode, secondNode);
+        assertThat("index: [test] expected to be rebalanced on both nodes", test.size(), equalTo(2));
+
+        // flip the cluster wide setting such that we can also balance for index test_1 eventually we should have one shard of each index on each node
+        client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE, randomBoolean() ? EnableAllocationDecider.Rebalance.PRIMARIES : EnableAllocationDecider.Rebalance.ALL)).get();
+        logger.info("--> balance index [test_1]");
+        client().admin().cluster().prepareReroute().get();
+        ensureGreen("test_1");
+        Set<String> test_1 = assertAllShardsOnNodes("test_1", firstNode, secondNode);
+        assertThat("index: [test_1] expected to be rebalanced on both nodes", test_1.size(), equalTo(2));
+
+        test = assertAllShardsOnNodes("test", firstNode, secondNode);
+        assertThat("index: [test] expected to be rebalanced on both nodes", test.size(), equalTo(2));
+    }
+}

+ 183 - 2
src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationTests.java

@@ -19,20 +19,27 @@
 
 package org.elasticsearch.cluster.routing.allocation.decider;
 
+import com.carrotsearch.randomizedtesting.generators.RandomPicks;
+import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.MutableShardRouting;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.common.logging.ESLogger;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.settings.NodeSettingsService;
 import org.elasticsearch.test.ElasticsearchAllocationTestCase;
 import org.junit.Test;
 
-import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
-import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
 import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.*;
 import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
 import static org.hamcrest.Matchers.equalTo;
@@ -145,4 +152,178 @@ public class EnableAllocationTests extends ElasticsearchAllocationTestCase {
         assertThat(clusterState.readOnlyRoutingNodes().shardsWithState("disabled", STARTED).size(), equalTo(0));
     }
 
+
+
+
+    @Test
+    public void testEnableClusterBalance() {
+        final boolean useClusterSetting = randomBoolean();
+        final Rebalance allowedOnes = RandomPicks.randomFrom(getRandom(), EnumSet.of(Rebalance.PRIMARIES, Rebalance.REPLICAS, Rebalance.ALL));
+        Settings build = settingsBuilder()
+                .put(CLUSTER_ROUTING_REBALANCE_ENABLE, useClusterSetting ? Rebalance.NONE: RandomPicks.randomFrom(getRandom(), Rebalance.values())) // index settings override cluster settings
+                .put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE, 3)
+                .build();
+        NodeSettingsService nodeSettingsService = new NodeSettingsService(build);
+        AllocationService strategy = createAllocationService(build, nodeSettingsService, getRandom());
+        Settings indexSettings = useClusterSetting ? ImmutableSettings.EMPTY : settingsBuilder().put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, Rebalance.NONE).build();
+
+        logger.info("Building initial routing table");
+        MetaData metaData = MetaData.builder()
+                .put(IndexMetaData.builder("test").settings(indexSettings).numberOfShards(3).numberOfReplicas(1))
+
+                .put(IndexMetaData.builder("always_disabled").settings(settingsBuilder().put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, Rebalance.NONE)).numberOfShards(1).numberOfReplicas(1))
+                .build();
+
+        RoutingTable routingTable = RoutingTable.builder()
+                .addAsNew(metaData.index("test"))
+                .addAsNew(metaData.index("always_disabled"))
+                .build();
+
+        ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
+
+        logger.info("--> adding one nodes and do rerouting");
+        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
+                .put(newNode("node1"))
+                .put(newNode("node2"))
+        ).build();
+        routingTable = strategy.reroute(clusterState).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(4));
+        logger.info("--> start the shards (primaries)");
+        routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(4));
+        assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(4));
+
+        routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(8));
+        assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
+
+        logger.info("--> adding one nodes and do rerouting");
+        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
+                .put(newNode("node1"))
+                .put(newNode("node2"))
+                .put(newNode("node3"))
+        ).build();
+        ClusterState prevState = clusterState;
+        routingTable = strategy.reroute(clusterState).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(8));
+        assertThat(clusterState.routingNodes().shardsWithState(RELOCATING).size(), equalTo(0));
+
+        if (useClusterSetting) {
+            prevState = clusterState;
+            clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(metaData).transientSettings(settingsBuilder()
+                .put(CLUSTER_ROUTING_REBALANCE_ENABLE, allowedOnes)
+                .build())).build();
+        } else {
+            prevState = clusterState;
+            IndexMetaData meta = clusterState.getMetaData().index("test");
+            IndexMetaData meta1 = clusterState.getMetaData().index("always_disabled");
+            clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(metaData).removeAllIndices().put(IndexMetaData.builder(meta1))
+                    .put(IndexMetaData.builder(meta).settings(settingsBuilder().put(meta.getSettings()).put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, allowedOnes).build())))
+                    .build();
+
+        }
+        nodeSettingsService.clusterChanged(new ClusterChangedEvent("foo", clusterState, prevState));
+        routingTable = strategy.reroute(clusterState).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertThat("expected 6 shards to be started 2 to relocate useClusterSettings: " + useClusterSetting, clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(6));
+        assertThat("expected 2 shards to relocate useClusterSettings: " + useClusterSetting, clusterState.routingNodes().shardsWithState(RELOCATING).size(), equalTo(2));
+        List<MutableShardRouting> mutableShardRoutings = clusterState.routingNodes().shardsWithState(RELOCATING);
+        switch (allowedOnes) {
+            case PRIMARIES:
+                for (MutableShardRouting routing : mutableShardRoutings) {
+                    assertTrue("only primaries are allowed to relocate", routing.primary());
+                    assertThat("only test index can rebalance", routing.getIndex(), equalTo("test"));
+                }
+                break;
+            case REPLICAS:
+                for (MutableShardRouting routing : mutableShardRoutings) {
+                    assertFalse("only replicas are allowed to relocate", routing.primary());
+                    assertThat("only test index can rebalance", routing.getIndex(), equalTo("test"));
+                }
+                break;
+            case ALL:
+                for (MutableShardRouting routing : mutableShardRoutings) {
+                    assertThat("only test index can rebalance", routing.getIndex(), equalTo("test"));
+                }
+                break;
+            default:
+                fail("only replicas, primaries or all are allowed");
+        }
+        routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(8));
+        assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
+
+    }
+
+    @Test
+    public void testEnableClusterBalanceNoReplicas() {
+        final boolean useClusterSetting = randomBoolean();
+        Settings build = settingsBuilder()
+                .put(CLUSTER_ROUTING_REBALANCE_ENABLE, useClusterSetting ? Rebalance.NONE: RandomPicks.randomFrom(getRandom(), Rebalance.values())) // index settings override cluster settings
+                .put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE, 3)
+                .build();
+        NodeSettingsService nodeSettingsService = new NodeSettingsService(build);
+        AllocationService strategy = createAllocationService(build, nodeSettingsService, getRandom());
+        Settings indexSettings = useClusterSetting ? ImmutableSettings.EMPTY : settingsBuilder().put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, Rebalance.NONE).build();
+
+        logger.info("Building initial routing table");
+        MetaData metaData = MetaData.builder()
+                .put(IndexMetaData.builder("test").settings(indexSettings).numberOfShards(6).numberOfReplicas(0))
+                .build();
+
+        RoutingTable routingTable = RoutingTable.builder()
+                .addAsNew(metaData.index("test"))
+                .build();
+
+        ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
+
+        logger.info("--> adding one nodes and do rerouting");
+        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
+                .put(newNode("node1"))
+                .put(newNode("node2"))
+        ).build();
+        routingTable = strategy.reroute(clusterState).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(6));
+        logger.info("--> start the shards (primaries)");
+        routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(6));
+        assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
+
+        logger.info("--> adding one nodes and do rerouting");
+        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
+                .put(newNode("node1"))
+                .put(newNode("node2"))
+                .put(newNode("node3"))
+        ).build();
+        ClusterState prevState = clusterState;
+        routingTable = strategy.reroute(clusterState).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(6));
+        assertThat(clusterState.routingNodes().shardsWithState(RELOCATING).size(), equalTo(0));
+        if (useClusterSetting) {
+            prevState = clusterState;
+            clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(metaData).transientSettings(settingsBuilder()
+                    .put(CLUSTER_ROUTING_REBALANCE_ENABLE, randomBoolean() ? Rebalance.PRIMARIES : Rebalance.ALL)
+                    .build())).build();
+        } else {
+            prevState = clusterState;
+            IndexMetaData meta = clusterState.getMetaData().index("test");
+            clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(metaData).removeAllIndices()
+                    .put(IndexMetaData.builder(meta).settings(settingsBuilder().put(meta.getSettings()).put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, randomBoolean() ? Rebalance.PRIMARIES : Rebalance.ALL).build()))).build();
+        }
+        nodeSettingsService.clusterChanged(new ClusterChangedEvent("foo", clusterState, prevState));
+        routingTable = strategy.reroute(clusterState).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        assertThat("expected 4 primaries to be started and 2 to relocate useClusterSettings: " + useClusterSetting, clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(4));
+        assertThat("expected 2 primaries to relocate useClusterSettings: " + useClusterSetting, clusterState.routingNodes().shardsWithState(RELOCATING).size(), equalTo(2));
+
+    }
+
 }

+ 4 - 4
src/test/java/org/elasticsearch/gateway/local/RecoveryBackwardsCompatibilityTests.java

@@ -25,7 +25,6 @@ import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
 import org.elasticsearch.action.count.CountResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -47,8 +46,7 @@ public class RecoveryBackwardsCompatibilityTests extends ElasticsearchBackwardsC
         return ImmutableSettings.builder()
                 .put(super.nodeSettings(nodeOrdinal))
                 .put("action.admin.cluster.node.shutdown.delay", "10ms")
-                .put("gateway.recover_after_nodes", 2)
-                .put(BalancedShardsAllocator.SETTING_THRESHOLD, 100.0f).build(); // use less aggressive settings
+                .put("gateway.recover_after_nodes", 2).build();
     }
 
     protected int minExternalNodes() {
@@ -64,7 +62,9 @@ public class RecoveryBackwardsCompatibilityTests extends ElasticsearchBackwardsC
     @LuceneTestCase.Slow
     @TestLogging("discovery.zen:TRACE")
     public void testReusePeerRecovery() throws Exception {
-        assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings()).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)));
+        assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings())
+                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+                .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)));
         logger.info("--> indexing docs");
         int numDocs = scaledRandomIntBetween(100, 1000);
         IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];

+ 6 - 7
src/test/java/org/elasticsearch/gateway/local/SimpleRecoveryLocalGatewayTests.java

@@ -24,7 +24,6 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
 import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -351,7 +350,11 @@ public class SimpleRecoveryLocalGatewayTests extends ElasticsearchIntegrationTes
                 .put("action.admin.cluster.node.shutdown.delay", "10ms")
                 .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
                 .put("gateway.recover_after_nodes", 4)
-                .put(MockDirectoryHelper.CRASH_INDEX, false);
+                .put(MockDirectoryHelper.CRASH_INDEX, false)
+                // prevent any rebalance actions during the peer recovery
+                // if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
+                // we reuse the files on disk after full restarts for replicas.
+                .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE);
 
         internalCluster().startNodesAsync(4, settings.build()).get();
 
@@ -368,11 +371,7 @@ public class SimpleRecoveryLocalGatewayTests extends ElasticsearchIntegrationTes
         ensureGreen();
 
         logger.info("--> shutting down the nodes");
-        // prevent any rebalance actions during the peer recovery
-        // if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
-        // we reuse the files on disk after full restarts for replicas.
-        client().admin().cluster().prepareUpdateSettings()
-                .setPersistentSettings(settingsBuilder().put(BalancedShardsAllocator.SETTING_THRESHOLD, 100.0f)).get();
+
         // Disable allocations while we are closing nodes
         client().admin().cluster().prepareUpdateSettings()
                 .setTransientSettings(settingsBuilder()

+ 4 - 1
src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java

@@ -44,6 +44,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.*;
+import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.inject.Inject;
@@ -80,6 +81,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
 import static org.hamcrest.Matchers.*;
 
@@ -320,6 +322,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
                 .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
                 .put("index.routing.allocation.include._name", primariesNode.getNode().name())
                 .put("indices.recovery.concurrent_streams", 10)
+                .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)
         ));
         ensureGreen();
         IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
@@ -422,7 +425,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
         // it snapshots and that will write a new segments.X+1 file
         logger.info("-->  creating repository");
         assertAcked(client().admin().cluster().preparePutRepository("test-repo")
-                .setType("fs").setSettings(ImmutableSettings.settingsBuilder()
+                .setType("fs").setSettings(settingsBuilder()
                         .put("location", newTempDir(LifecycleScope.SUITE).getAbsolutePath())
                         .put("compress", randomBoolean())
                         .put("chunk_size", randomIntBetween(100, 1000))));

+ 2 - 7
src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java

@@ -28,7 +28,6 @@ import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
 import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
 import org.elasticsearch.action.admin.indices.segments.ShardSegments;
 import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.common.logging.ESLogger;
@@ -45,7 +44,6 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest;
 import org.elasticsearch.test.rest.client.http.HttpRequestBuilder;
 import org.elasticsearch.test.rest.client.http.HttpResponse;
 import org.elasticsearch.test.rest.json.JsonPath;
-import org.junit.After;
 import org.junit.BeforeClass;
 
 import java.net.InetSocketAddress;
@@ -129,17 +127,14 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest {
         logSegmentsState(null);
         backwardsCluster().allowOnAllNodes(indexNames);
         ensureGreen();
-        // set the balancing threshold to something very highish such that no rebalancing happens after the upgrade
-        builder = ImmutableSettings.builder();
-        builder.put(BalancedShardsAllocator.SETTING_THRESHOLD, 100.0f);
-        client().admin().cluster().prepareUpdateSettings().setPersistentSettings(builder).get();
         // disable allocation entirely until all nodes are upgraded
         builder = ImmutableSettings.builder();
         builder.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE);
         client().admin().cluster().prepareUpdateSettings().setTransientSettings(builder).get();
         backwardsCluster().upgradeAllNodes();
-        // we are done - enable allocation again
         builder = ImmutableSettings.builder();
+        // disable rebalanceing entirely for the time being otherwise we might get relocations / rebalance from nodes with old segments
+        builder.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE);
         builder.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.ALL);
         client().admin().cluster().prepareUpdateSettings().setTransientSettings(builder).get();
         ensureGreen();

+ 6 - 1
src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java

@@ -57,11 +57,16 @@ public abstract class ElasticsearchAllocationTestCase extends ElasticsearchTestC
     }
 
     public static AllocationService createAllocationService(Settings settings, Random random) {
+        return createAllocationService(settings,  new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS), random);
+    }
+
+    public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
         return new AllocationService(settings,
-                randomAllocationDeciders(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS), random),
+                randomAllocationDeciders(settings, nodeSettingsService, random),
                 new ShardsAllocators(settings), ClusterInfoService.EMPTY);
     }
 
+
     public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
         final ImmutableSet<Class<? extends AllocationDecider>> defaultAllocationDeciders = AllocationDecidersModule.DEFAULT_ALLOCATION_DECIDERS;
         final List<AllocationDecider> list = new ArrayList<>();

+ 25 - 0
src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java

@@ -55,13 +55,18 @@ import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
@@ -1691,6 +1696,26 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
         return new NumShards(numShards, numReplicas);
     }
 
+    /**
+     * Asserts that all shards are allocated on nodes matching the given node pattern.
+     */
+    public Set<String> assertAllShardsOnNodes(String index, String... pattern) {
+        Set<String> nodes = new HashSet<>();
+        ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
+        for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
+            for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
+                for (ShardRouting shardRouting : indexShardRoutingTable) {
+                    if (shardRouting.currentNodeId() != null &&  index.equals(shardRouting.getIndex())) {
+                        String name = clusterState.nodes().get(shardRouting.currentNodeId()).name();
+                        nodes.add(name);
+                        assertThat("Allocated on new node: " + name, Regex.simpleMatch(pattern, name), is(true));
+                    }
+                }
+            }
+        }
+        return nodes;
+    }
+
     protected static class NumShards {
         public final int numPrimaries;
         public final int numReplicas;