瀏覽代碼

Only filter intial recovery (post API) when shrinking an index (#18661)

Today we use `index.routing.allocation.include._id` to filter the allocation
for the shrink target index. That has the sideeffect that the user has to
delete that setting / change it once the primary has been recovered (shrink is done)
This PR adds a dedicated filter that can only be set internally that only filters
allocation for unassigned shards.
Simon Willnauer 9 年之前
父節點
當前提交
22dfc41521

+ 20 - 2
core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

@@ -30,8 +30,10 @@ import org.elasticsearch.cluster.Diffable;
 import org.elasticsearch.cluster.DiffableUtils;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
 import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseFieldMatcher;
@@ -215,6 +217,8 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         Setting.groupSetting("index.routing.allocation.include.", Property.Dynamic, Property.IndexScope);
     public static final Setting<Settings> INDEX_ROUTING_EXCLUDE_GROUP_SETTING =
         Setting.groupSetting("index.routing.allocation.exclude.", Property.Dynamic, Property.IndexScope);
+    public static final Setting<Settings> INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING =
+        Setting.groupSetting("index.routing.allocation.initial_recovery."); // this is only setable internally not a registered setting!!
 
     public static final IndexMetaData PROTO = IndexMetaData.builder("")
             .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
@@ -254,6 +258,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
     private final DiscoveryNodeFilters requireFilters;
     private final DiscoveryNodeFilters includeFilters;
     private final DiscoveryNodeFilters excludeFilters;
+    private final DiscoveryNodeFilters initialRecoveryFilters;
 
     private final Version indexCreatedVersion;
     private final Version indexUpgradedVersion;
@@ -262,7 +267,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
     private IndexMetaData(Index index, long version, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
                           ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases,
                           ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> activeAllocationIds,
-                          DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
+                          DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
                           Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion) {
 
         this.index = index;
@@ -281,6 +286,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         this.requireFilters = requireFilters;
         this.includeFilters = includeFilters;
         this.excludeFilters = excludeFilters;
+        this.initialRecoveryFilters = initialRecoveryFilters;
         this.indexCreatedVersion = indexCreatedVersion;
         this.indexUpgradedVersion = indexUpgradedVersion;
         this.minimumCompatibleLuceneVersion = minimumCompatibleLuceneVersion;
@@ -430,6 +436,11 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         return requireFilters;
     }
 
+    @Nullable
+    public DiscoveryNodeFilters getInitialRecoveryFilters() {
+        return initialRecoveryFilters;
+    }
+
     @Nullable
     public DiscoveryNodeFilters includeFilters() {
         return includeFilters;
@@ -892,6 +903,13 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
             } else {
                 excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap);
             }
+            Map<String, String> initialRecoveryMap = INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING.get(settings).getAsMap();
+            final DiscoveryNodeFilters initialRecoveryFilters;
+            if (initialRecoveryMap.isEmpty()) {
+                initialRecoveryFilters = null;
+            } else {
+                initialRecoveryFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, initialRecoveryMap);
+            }
             Version indexCreatedVersion = Version.indexCreated(settings);
             Version indexUpgradedVersion = settings.getAsVersion(IndexMetaData.SETTING_VERSION_UPGRADED, indexCreatedVersion);
             String stringLuceneVersion = settings.get(SETTING_VERSION_MINIMUM_COMPATIBLE);
@@ -915,7 +933,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
 
             final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
             return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
-                tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, includeFilters, excludeFilters,
+                tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters,
                 indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion);
         }
 

+ 4 - 4
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

@@ -533,11 +533,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
         final Predicate<String> analysisSimilarityPredicate = (s) -> s.startsWith("index.similarity.")
             || s.startsWith("index.analysis.");
         indexSettingsBuilder
-            // we can only shrink to 1 index so far!
+            // we can only shrink to 1 shard so far!
             .put("index.number_of_shards", 1)
-            // we use "i.r.a.include" rather than "i.r.a.require" since it's allows one of the nodes holding an
-            // instanceof all shards.
-            .put("index.routing.allocation.include._id",
+            // we use "i.r.a.initial_recovery" rather than "i.r.a.require|include" since we want the replica to allocate right away
+            // once we are allocated.
+            .put("index.routing.allocation.initial_recovery._id",
                 Strings.arrayToCommaDelimitedString(nodesToAllocateOn.toArray()))
             // we only try once and then give up with a shrink index
             .put("index.allocation.max_retries", 1)

+ 13 - 0
core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java

@@ -88,6 +88,19 @@ public class FilterAllocationDecider extends AllocationDecider {
 
     @Override
     public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
+        if (shardRouting.unassigned()) {
+            // only for unassigned - we filter allocation right after the index creation ie. for shard shrinking etc. to ensure
+            // that once it has been allocated post API the replicas can be allocated elsewhere without user interaction
+            // this is a setting that can only be set within the system!
+            IndexMetaData indexMd = allocation.metaData().getIndexSafe(shardRouting.index());
+            DiscoveryNodeFilters initialRecoveryFilters = indexMd.getInitialRecoveryFilters();
+            if (shardRouting.allocatedPostIndexCreate(indexMd) == false &&
+                initialRecoveryFilters != null &&
+                initialRecoveryFilters.match(node.node()) == false) {
+                return allocation.decision(Decision.NO, NAME, "node does not match index initial recovery filters [%s]",
+                    indexMd.includeFilters());
+            }
+        }
         return shouldFilter(shardRouting, node, allocation);
     }
 

+ 1 - 2
core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java

@@ -311,10 +311,9 @@ public class CreateIndexIT extends ESIntegTestCase {
             .setSettings(Settings.builder().put("index.number_of_replicas", 0).build()).get());
         ensureGreen();
         assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
-        // let it be allocated anywhere and bump replicas
+        // bump replicas
         client().admin().indices().prepareUpdateSettings("target")
             .setSettings(Settings.builder()
-                .putNull("index.routing.allocation.include._id")
                 .put("index.number_of_replicas", 1)).get();
         ensureGreen();
         assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);

+ 1 - 1
core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java

@@ -159,7 +159,7 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
         assertEquals("similarity settings must be copied", "BM25", builder.build().get("index.similarity.default.type"));
         assertEquals("analysis settings must be copied",
             "keyword", builder.build().get("index.analysis.analyzer.my_analyzer.tokenizer"));
-        assertEquals("node1", builder.build().get("index.routing.allocation.include._id"));
+        assertEquals("node1", builder.build().get("index.routing.allocation.initial_recovery._id"));
         assertEquals("1", builder.build().get("index.allocation.max_retries"));
     }
 

+ 125 - 0
core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java

@@ -0,0 +1,125 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.EmptyClusterInfoService;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
+import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
+import org.elasticsearch.cluster.routing.allocation.decider.Decision;
+import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESAllocationTestCase;
+import org.elasticsearch.test.gateway.NoopGatewayAllocator;
+
+import java.util.Collections;
+
+import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
+import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
+import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
+
+public class FilterAllocationDeciderTests extends ESAllocationTestCase {
+
+    public void testFilterInitialAllocation() {
+        FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(Settings.EMPTY,
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
+        AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY,
+            Collections.singleton(filterAllocationDecider));
+        AllocationService service = new AllocationService(Settings.builder().build(), allocationDeciders,
+            NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
+        ClusterState state = createInitialClusterState(service, Settings.builder().put("index.routing.allocation.initial_recovery._id",
+            "node2").build());
+        RoutingTable routingTable = state.routingTable();
+
+        // we can initally only allocate on node2
+        assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
+        assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node2");
+        routingTable = service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0)).routingTable();
+        state = ClusterState.builder(state).routingTable(routingTable).build();
+        assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED);
+        assertNull(routingTable.index("idx").shard(0).shards().get(0).currentNodeId());
+
+        RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
+            null, 0, false);
+        assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0),
+            state.getRoutingNodes().node("node2")
+            ,allocation), Decision.YES);
+        assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0),
+            state.getRoutingNodes().node("node1")
+            ,allocation), Decision.NO);
+
+        // after failing the shard we are unassigned since the node is blacklisted and we can't initialize on the other node
+        state = stateFromResult(state, service.reroute(state, "try allocate again"));
+        routingTable = state.routingTable();
+        assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
+        assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node2");
+
+        state = stateFromResult(state, service.applyStartedShards(state, routingTable.index("idx").shard(0).shards()));
+        routingTable = state.routingTable();
+
+        // ok now we are started and can be allocated anywhere!! lets see...
+        assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), STARTED);
+        assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node2");
+        assertTrue(routingTable.index("idx").shard(0).shards().get(0).allocatedPostIndexCreate(state.getMetaData().index("idx")));
+
+        // we fail it again to check if we are initializing immediately on the other node
+        state = stateFromResult(state, service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0)));
+        routingTable = state.routingTable();
+        assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
+        assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node1");
+        assertTrue(routingTable.index("idx").shard(0).shards().get(0).allocatedPostIndexCreate(state.getMetaData().index("idx")));
+
+        allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
+            null, 0, false);
+        assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0),
+            state.getRoutingNodes().node("node2")
+            ,allocation), Decision.YES);
+        assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0),
+            state.getRoutingNodes().node("node1")
+            ,allocation), Decision.YES);
+    }
+
+    private ClusterState stateFromResult(ClusterState previousState, RoutingAllocation.Result result) {
+        return ClusterState.builder(previousState).routingTable(result.routingTable()).metaData(result.metaData()).build();
+    }
+
+    private ClusterState createInitialClusterState(AllocationService service, Settings settings) {
+        MetaData.Builder metaBuilder = MetaData.builder();
+        metaBuilder.put(IndexMetaData.builder("idx").settings(settings(Version.CURRENT).put(settings))
+            .numberOfShards(1).numberOfReplicas(0));
+        MetaData metaData = metaBuilder.build();
+        RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
+        routingTableBuilder.addAsNew(metaData.index("idx"));
+
+        RoutingTable routingTable = routingTableBuilder.build();
+        ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
+            .metaData(metaData).routingTable(routingTable).build();
+        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")))
+            .build();
+        routingTable = service.reroute(clusterState, "reroute", false).routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        return clusterState;
+    }
+}

+ 0 - 19
docs/reference/indices/shrink-index.asciidoc

@@ -36,34 +36,15 @@ The `_shrink` API is similar to <<indices-create-index>> and accepts `settings`
 $ curl -XPUT 'http://localhost:9200/logs/_shrink/logs_single_shard' -d '{
     "settings" : {
         "index.codec" : "best_compression", <1>
-        "index.number_of_replicas" : 0 <2>
     }
 }'
 --------------------------------------------------
 <1> Enables `best_compression` codec on the target index
-<2> Sets the number of replicas on the target index to `0` to ensure the cluster is green once the shard initialized
 
 The API call above returns immediately once the target index is created but doesn't wait
 for the shrink operation to start. Once the target indices primary shard moves to state `initializing`
 the shrink operation has started.
 
-Once the index is shrunk replicas can be set to `1` and allocation filtering can be removed.
-
-[source,js]
---------------------------------------------------
-$ curl -XPUT 'http://localhost:9200/logs_single_shard/_settings' -d '{
-    "settings" : {
-        "index.routing.allocation.include._id" : null, <1>
-        "index.number_of_replicas" : 1 <2>
-    }
-}'
---------------------------------------------------
-
-<1> Resets the allocation filtering for the new shrunk index to allow replica allocation
-<2> Bumps the number of replicas to 1
-
-
-
 [float]
 [[shrink-index-limitations]]
 === Limitations