Browse Source

Remove resize index settings once shards are started (#90391)

This commit removes the specific index settings used to resize 
indices during clone/split/shrink.

Settings are removed once all primary shards are started. This is
 implemented in a new RoutingChangesObserver in order to not 
add more complexity to the existing IndexMetadataUpdater.

Sadly ILM relies on the INDEX_RESIZE_SOURCE_NAME_KEY 
setting to identify an index that has been shrunk, so this setting 
is not removed for now if the index has a policy.

Closes #90127
Tanguy Leroux 3 years ago
parent
commit
6b614a6cc9

+ 6 - 0
docs/changelog/90391.yaml

@@ -0,0 +1,6 @@
+pr: 90391
+summary: Remove resize index settings once shards are started
+area: Recovery
+type: enhancement
+issues:
+ - 90127

+ 2 - 0
server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CloneIndexIT.java

@@ -19,6 +19,7 @@ import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.VersionUtils;
 import org.elasticsearch.xcontent.XContentType;
 
+import static org.elasticsearch.action.admin.indices.create.ShrinkIndexIT.assertNoResizeSourceIndexSettings;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.hamcrest.Matchers.equalTo;
@@ -74,6 +75,7 @@ public class CloneIndexIT extends ESIntegTestCase {
                     .get()
             );
             ensureGreen();
+            assertNoResizeSourceIndexSettings("target");
 
             final IndicesStatsResponse targetStats = client().admin().indices().prepareStats("target").get();
             assertThat(targetStats.getIndex("target").getIndexShards().keySet().size(), equalTo(numPrimaryShards));

+ 97 - 46
server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java

@@ -31,6 +31,7 @@ import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.Murmur3HashFunction;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
@@ -59,6 +60,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 public class ShrinkIndexIT extends ESIntegTestCase {
 
@@ -168,6 +171,9 @@ public class ShrinkIndexIT extends ESIntegTestCase {
         assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
         assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
         assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
+
+        assertNoResizeSourceIndexSettings("first_shrink");
+        assertNoResizeSourceIndexSettings("second_shrink");
     }
 
     public void testShrinkIndexPrimaryTerm() throws Exception {
@@ -239,6 +245,7 @@ public class ShrinkIndexIT extends ESIntegTestCase {
         for (int shardId = 0; shardId < numberOfTargetShards; shardId++) {
             assertThat(afterShrinkIndexMetadata.primaryTerm(shardId), equalTo(beforeShrinkPrimaryTerm + 1));
         }
+        assertNoResizeSourceIndexSettings("target");
     }
 
     private static IndexMetadata indexMetadata(final Client client, final String index) {
@@ -304,6 +311,8 @@ public class ShrinkIndexIT extends ESIntegTestCase {
         );
         ensureGreen();
 
+        assertNoResizeSourceIndexSettings("target");
+
         // resolve true merge node - this is not always the node we required as all shards may be on another node
         final ClusterState state = client().admin().cluster().prepareState().get().getState();
         DiscoveryNode mergeNode = state.nodes().get(state.getRoutingTable().index("target").shard(0).primaryShard().currentNodeId());
@@ -451,6 +460,7 @@ public class ShrinkIndexIT extends ESIntegTestCase {
         assertTrue("expected shard size must be set but wasn't: " + expectedShardSize, expectedShardSize > 0);
         ensureGreen();
         assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
+        assertNoResizeSourceIndexSettings("target");
     }
 
     public void testCreateShrinkWithIndexSort() throws Exception {
@@ -524,6 +534,8 @@ public class ShrinkIndexIT extends ESIntegTestCase {
                 .get()
         );
         ensureGreen();
+        assertNoResizeSourceIndexSettings("target");
+
         flushAndRefresh();
         GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("target").execute().actionGet();
         assertEquals(settingsResponse.getSetting("target", "index.sort.field"), "id");
@@ -574,56 +586,61 @@ public class ShrinkIndexIT extends ESIntegTestCase {
                 Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")
             )
             .get();
+        try {
+            // now merge source into a single shard index
+            assertAcked(
+                client().admin()
+                    .indices()
+                    .prepareResizeIndex("source", "target")
+                    .setSettings(Settings.builder().put("index.number_of_replicas", 0).build())
+                    .get()
+            );
+            ensureGreen();
+            assertNoResizeSourceIndexSettings("target");
 
-        // now merge source into a single shard index
-        assertAcked(
-            client().admin()
-                .indices()
-                .prepareResizeIndex("source", "target")
-                .setSettings(Settings.builder().put("index.number_of_replicas", 0).build())
-                .get()
-        );
-        ensureGreen();
-        ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
-        IndexMetadata target = clusterStateResponse.getState().getMetadata().index("target");
-        client().admin().indices().prepareForceMerge("target").setMaxNumSegments(1).setFlush(false).get();
-        IndicesSegmentResponse targetSegStats = client().admin().indices().prepareSegments("target").get();
-        ShardSegments segmentsStats = targetSegStats.getIndices().get("target").getShards().get(0).shards()[0];
-        assertTrue(segmentsStats.getNumberOfCommitted() > 0);
-        assertNotEquals(segmentsStats.getSegments(), segmentsStats.getNumberOfCommitted());
-
-        Iterable<IndicesService> dataNodeInstances = internalCluster().getDataNodeInstances(IndicesService.class);
-        for (IndicesService service : dataNodeInstances) {
-            if (service.hasIndex(target.getIndex())) {
-                IndexService indexShards = service.indexService(target.getIndex());
-                IndexShard shard = indexShards.getShard(0);
-                assertTrue(shard.isActive());
-                shard.flushOnIdle(0);
-                assertFalse(shard.isActive());
+            ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
+            IndexMetadata target = clusterStateResponse.getState().getMetadata().index("target");
+            client().admin().indices().prepareForceMerge("target").setMaxNumSegments(1).setFlush(false).get();
+            IndicesSegmentResponse targetSegStats = client().admin().indices().prepareSegments("target").get();
+            ShardSegments segmentsStats = targetSegStats.getIndices().get("target").getShards().get(0).shards()[0];
+            assertTrue(segmentsStats.getNumberOfCommitted() > 0);
+            assertNotEquals(segmentsStats.getSegments(), segmentsStats.getNumberOfCommitted());
+
+            Iterable<IndicesService> dataNodeInstances = internalCluster().getDataNodeInstances(IndicesService.class);
+            for (IndicesService service : dataNodeInstances) {
+                if (service.hasIndex(target.getIndex())) {
+                    IndexService indexShards = service.indexService(target.getIndex());
+                    IndexShard shard = indexShards.getShard(0);
+                    assertTrue(shard.isActive());
+                    shard.flushOnIdle(0);
+                    assertFalse(shard.isActive());
+                }
             }
+            assertBusy(() -> {
+                IndicesSegmentResponse targetStats = client().admin().indices().prepareSegments("target").get();
+                ShardSegments targetShardSegments = targetStats.getIndices().get("target").getShards().get(0).shards()[0];
+                Map<Integer, IndexShardSegments> source = sourceStats.getIndices().get("source").getShards();
+                int numSourceSegments = 0;
+                for (IndexShardSegments s : source.values()) {
+                    numSourceSegments += s.getAt(0).getNumberOfCommitted();
+                }
+                assertTrue(targetShardSegments.getSegments().size() < numSourceSegments);
+                assertEquals(targetShardSegments.getNumberOfCommitted(), targetShardSegments.getNumberOfSearch());
+                assertEquals(targetShardSegments.getNumberOfCommitted(), targetShardSegments.getSegments().size());
+                assertEquals(1, targetShardSegments.getSegments().size());
+            });
+        } finally {
+
+            // clean up
+            client().admin()
+                .cluster()
+                .prepareUpdateSettings()
+                .setPersistentSettings(
+                    Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null)
+                )
+                .get();
         }
-        assertBusy(() -> {
-            IndicesSegmentResponse targetStats = client().admin().indices().prepareSegments("target").get();
-            ShardSegments targetShardSegments = targetStats.getIndices().get("target").getShards().get(0).shards()[0];
-            Map<Integer, IndexShardSegments> source = sourceStats.getIndices().get("source").getShards();
-            int numSourceSegments = 0;
-            for (IndexShardSegments s : source.values()) {
-                numSourceSegments += s.getAt(0).getNumberOfCommitted();
-            }
-            assertTrue(targetShardSegments.getSegments().size() < numSourceSegments);
-            assertEquals(targetShardSegments.getNumberOfCommitted(), targetShardSegments.getNumberOfSearch());
-            assertEquals(targetShardSegments.getNumberOfCommitted(), targetShardSegments.getSegments().size());
-            assertEquals(1, targetShardSegments.getSegments().size());
-        });
 
-        // clean up
-        client().admin()
-            .cluster()
-            .prepareUpdateSettings()
-            .setPersistentSettings(
-                Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null)
-            )
-            .get();
     }
 
     public void testShrinkThenSplitWithFailedNode() throws Exception {
@@ -688,5 +705,39 @@ public class ShrinkIndexIT extends ESIntegTestCase {
                 .setResizeType(ResizeType.SPLIT)
         );
         ensureGreen("splitagain");
+        assertNoResizeSourceIndexSettings("splitagain");
+    }
+
+    static void assertNoResizeSourceIndexSettings(final String index) {
+        ClusterStateResponse clusterStateResponse = client().admin()
+            .cluster()
+            .prepareState()
+            .clear()
+            .clear()
+            .setMetadata(true)
+            .setRoutingTable(true)
+            .execute()
+            .actionGet();
+        IndexRoutingTable indexRoutingTable = clusterStateResponse.getState().routingTable().index(index);
+        assertThat("Index " + index + " should have all primaries started", indexRoutingTable.allPrimaryShardsActive(), equalTo(true));
+        IndexMetadata indexMetadata = clusterStateResponse.getState().metadata().index(index);
+        assertThat("Index " + index + " should have index metadata", indexMetadata, notNullValue());
+        assertThat("Index " + index + " should have index metadata", indexMetadata, notNullValue());
+        assertThat("Index " + index + " should not have resize source index", indexMetadata.getResizeSourceIndex(), nullValue());
+        assertThat(
+            "Index " + index + " should not have resize source name setting",
+            IndexMetadata.INDEX_RESIZE_SOURCE_UUID.exists(indexMetadata.getSettings()),
+            equalTo(false)
+        );
+        assertThat(
+            "Index " + index + " should not have resize source UUID setting",
+            IndexMetadata.INDEX_RESIZE_SOURCE_NAME.exists(indexMetadata.getSettings()),
+            equalTo(false)
+        );
+        assertThat(
+            "Index " + index + " should not have initial recovery setting",
+            indexMetadata.getSettings().get(IndexMetadata.INDEX_SHRINK_INITIAL_RECOVERY_KEY),
+            nullValue()
+        );
     }
 }

+ 7 - 0
server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java

@@ -55,6 +55,7 @@ import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.stream.IntStream;
 
+import static org.elasticsearch.action.admin.indices.create.ShrinkIndexIT.assertNoResizeSourceIndexSettings;
 import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -193,6 +194,7 @@ public class SplitIndexIT extends ESIntegTestCase {
         );
         ensureGreen();
         assertHitCount(client().prepareSearch("first_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs);
+        assertNoResizeSourceIndexSettings("first_split");
 
         for (int i = 0; i < numDocs; i++) { // now update
             IndexRequestBuilder builder = indexFunc.apply("first_split", i);
@@ -232,6 +234,8 @@ public class SplitIndexIT extends ESIntegTestCase {
         );
         ensureGreen();
         assertHitCount(client().prepareSearch("second_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs);
+        assertNoResizeSourceIndexSettings("second_split");
+
         // let it be allocated anywhere and bump replicas
         client().admin()
             .indices()
@@ -363,6 +367,7 @@ public class SplitIndexIT extends ESIntegTestCase {
         for (int shardId = 0; shardId < numberOfTargetShards; shardId++) {
             assertThat(aftersplitIndexMetadata.primaryTerm(shardId), equalTo(beforeSplitPrimaryTerm + 1));
         }
+        assertNoResizeSourceIndexSettings("target");
     }
 
     private static IndexMetadata indexMetadata(final Client client, final String index) {
@@ -417,6 +422,7 @@ public class SplitIndexIT extends ESIntegTestCase {
                     .get()
             );
             ensureGreen();
+            assertNoResizeSourceIndexSettings("target");
 
             final ClusterState state = client().admin().cluster().prepareState().get().getState();
             DiscoveryNode mergeNode = state.nodes().get(state.getRoutingTable().index("target").shard(0).primaryShard().currentNodeId());
@@ -563,5 +569,6 @@ public class SplitIndexIT extends ESIntegTestCase {
         }
         flushAndRefresh();
         assertSortedSegments("target", expectedIndexSort);
+        assertNoResizeSourceIndexSettings("target");
     }
 }

+ 6 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

@@ -1126,6 +1126,12 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
     public static final Setting<String> INDEX_RESIZE_SOURCE_UUID = Setting.simpleString(INDEX_RESIZE_SOURCE_UUID_KEY);
     public static final Setting<String> INDEX_RESIZE_SOURCE_NAME = Setting.simpleString(INDEX_RESIZE_SOURCE_NAME_KEY);
 
+    /**
+     * we use "i.r.a.initial_recovery" rather than "i.r.a.require|include" since we want the replica to allocate right away
+     * once we are allocated.
+     */
+    public static final String INDEX_SHRINK_INITIAL_RECOVERY_KEY = INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING.getKey() + "_id";
+
     public Index getResizeSourceIndex() {
         return INDEX_RESIZE_SOURCE_UUID.exists(settings)
             ? new Index(INDEX_RESIZE_SOURCE_NAME.get(settings), INDEX_RESIZE_SOURCE_UUID.get(settings))

+ 40 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

@@ -371,6 +371,46 @@ public class Metadata extends AbstractCollection<IndexMetadata> implements Diffa
         );
     }
 
+    public Metadata withIndexSettingsUpdates(final Map<Index, Settings> updates) {
+        Objects.requireNonNull(updates, "no indices to update settings for");
+
+        final ImmutableOpenMap.Builder<String, IndexMetadata> builder = ImmutableOpenMap.builder(indices);
+        updates.forEach((index, settings) -> {
+            IndexMetadata previous = builder.remove(index.getName());
+            assert previous != null : index;
+            builder.put(
+                index.getName(),
+                IndexMetadata.builder(previous).settingsVersion(previous.getSettingsVersion() + 1L).settings(settings).build()
+            );
+        });
+        return new Metadata(
+            clusterUUID,
+            clusterUUIDCommitted,
+            version,
+            coordinationMetadata,
+            transientSettings,
+            persistentSettings,
+            settings,
+            hashesOfConsistentSettings,
+            totalNumberOfShards,
+            totalOpenIndexShards,
+            builder.build(),
+            aliasedIndices,
+            templates,
+            customs,
+            allIndices,
+            visibleIndices,
+            allOpenIndices,
+            visibleOpenIndices,
+            allClosedIndices,
+            visibleClosedIndices,
+            indicesLookup,
+            mappingsByHash,
+            oldestIndexVersion,
+            reservedStateMetadata
+        );
+    }
+
     public Metadata withCoordinationMetadata(CoordinationMetadata coordinationMetadata) {
         return new Metadata(
             clusterUUID,

+ 4 - 7
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

@@ -99,6 +99,7 @@ import static java.util.Collections.emptyMap;
 import static java.util.stream.Collectors.toList;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_SHRINK_INITIAL_RECOVERY_KEY;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
@@ -1482,10 +1483,6 @@ public class MetadataCreateIndexService {
         final boolean copySettings,
         final IndexScopedSettings indexScopedSettings
     ) {
-        // we use "i.r.a.initial_recovery" rather than "i.r.a.require|include" since we want the replica to allocate right away
-        // once we are allocated.
-        final String initialRecoveryIdFilter = IndexMetadata.INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING.getKey() + "_id";
-
         final IndexMetadata sourceMetadata = currentState.metadata().index(resizeSourceIndex.getName());
         if (type == ResizeType.SHRINK) {
             final List<String> nodesToAllocateOn = validateShrinkIndex(
@@ -1494,13 +1491,13 @@ public class MetadataCreateIndexService {
                 resizeIntoName,
                 indexSettingsBuilder.build()
             );
-            indexSettingsBuilder.put(initialRecoveryIdFilter, Strings.arrayToCommaDelimitedString(nodesToAllocateOn.toArray()));
+            indexSettingsBuilder.put(INDEX_SHRINK_INITIAL_RECOVERY_KEY, Strings.arrayToCommaDelimitedString(nodesToAllocateOn.toArray()));
         } else if (type == ResizeType.SPLIT) {
             validateSplitIndex(currentState, resizeSourceIndex.getName(), resizeIntoName, indexSettingsBuilder.build());
-            indexSettingsBuilder.putNull(initialRecoveryIdFilter);
+            indexSettingsBuilder.putNull(INDEX_SHRINK_INITIAL_RECOVERY_KEY);
         } else if (type == ResizeType.CLONE) {
             validateCloneIndex(currentState, resizeSourceIndex.getName(), resizeIntoName, indexSettingsBuilder.build());
-            indexSettingsBuilder.putNull(initialRecoveryIdFilter);
+            indexSettingsBuilder.putNull(INDEX_SHRINK_INITIAL_RECOVERY_KEY);
         } else {
             throw new IllegalStateException("unknown resize type is " + type);
         }

+ 1 - 0
server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

@@ -529,6 +529,7 @@ public class RoutingNodes extends AbstractCollection<RoutingNode> {
                 }
             }
         }
+
         return startedShard;
     }
 

+ 63 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/ResizeSourceIndexSettingsUpdater.java

@@ -0,0 +1,63 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.RoutingChangesObserver;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.index.Index;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A {@link RoutingChangesObserver} that removes index settings used to resize indices (Clone/Split/Shrink) once all primaries are started.
+ */
+public class ResizeSourceIndexSettingsUpdater implements RoutingChangesObserver {
+
+    private final Set<Index> changes = new HashSet<>();
+
+    @Override
+    public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
+        if (startedShard.primary() && (initializingShard.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS)) {
+            assert startedShard.recoverySource() == null : "recovery source should have been removed once shard is started";
+            changes.add(startedShard.shardId().getIndex());
+        }
+    }
+
+    public Metadata applyChanges(Metadata metadata, RoutingTable routingTable) {
+        if (changes.isEmpty() == false) {
+            final Map<Index, Settings> updates = Maps.newHashMapWithExpectedSize(changes.size());
+            for (Index index : changes) {
+                var indexMetadata = metadata.getIndexSafe(index);
+                if (routingTable.index(index).allPrimaryShardsActive()) {
+                    assert indexMetadata.getResizeSourceIndex() != null : "no resize source index for " + index;
+
+                    Settings.Builder builder = Settings.builder().put(indexMetadata.getSettings());
+                    builder.remove(IndexMetadata.INDEX_SHRINK_INITIAL_RECOVERY_KEY);
+                    builder.remove(IndexMetadata.INDEX_RESIZE_SOURCE_UUID_KEY);
+                    if (Strings.isNullOrEmpty(indexMetadata.getLifecyclePolicyName())) {
+                        // Required by ILM after an index has been shrunk
+                        builder.remove(IndexMetadata.INDEX_RESIZE_SOURCE_NAME_KEY);
+                    }
+                    updates.put(index, builder.build());
+                }
+            }
+            return metadata.withIndexSettingsUpdates(updates);
+        }
+        return metadata;
+    }
+}

+ 5 - 2
server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java

@@ -67,10 +67,12 @@ public class RoutingAllocation {
     private final IndexMetadataUpdater indexMetadataUpdater = new IndexMetadataUpdater();
     private final RoutingNodesChangedObserver nodesChangedObserver = new RoutingNodesChangedObserver();
     private final RestoreInProgressUpdater restoreInProgressUpdater = new RestoreInProgressUpdater();
+    private final ResizeSourceIndexSettingsUpdater resizeSourceIndexUpdater = new ResizeSourceIndexSettingsUpdater();
     private final RoutingChangesObserver routingChangesObserver = new RoutingChangesObserver.DelegatingRoutingChangesObserver(
         nodesChangedObserver,
         indexMetadataUpdater,
-        restoreInProgressUpdater
+        restoreInProgressUpdater,
+        resizeSourceIndexUpdater
     );
 
     private final Map<String, SingleNodeShutdownMetadata> nodeReplacementTargets;
@@ -314,7 +316,8 @@ public class RoutingAllocation {
      * Returns updated {@link Metadata} based on the changes that were made to the routing nodes
      */
     public Metadata updateMetadataWithRoutingChanges(RoutingTable newRoutingTable) {
-        return indexMetadataUpdater.applyChanges(metadata(), newRoutingTable);
+        Metadata metadata = indexMetadataUpdater.applyChanges(metadata(), newRoutingTable);
+        return resizeSourceIndexUpdater.applyChanges(metadata, newRoutingTable);
     }
 
     /**

+ 189 - 0
server/src/test/java/org/elasticsearch/cluster/routing/allocation/ResizeSourceIndexSettingsUpdaterTests.java

@@ -0,0 +1,189 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ESAllocationTestCase;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
+import org.elasticsearch.common.settings.Settings;
+
+import java.util.List;
+
+import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
+import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
+import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class ResizeSourceIndexSettingsUpdaterTests extends ESAllocationTestCase {
+
+    public void testResizeIndexSettingsRemovedAfterStart() {
+        final DiscoveryNodes discoveryNodes = DiscoveryNodes.builder()
+            .add(newNode("node1", "id1", MASTER_DATA_ROLES))
+            .add(newNode("node2", "id2", MASTER_DATA_ROLES))
+            .build();
+
+        final DiscoveryNode resizeNode = randomFrom(discoveryNodes.getDataNodes().values());
+
+        final String sourceIndex = "source";
+        final String targetIndex = "target";
+
+        final Metadata sourceMetadata = Metadata.builder()
+            .put(
+                IndexMetadata.builder(sourceIndex)
+                    .settings(
+                        settings(Version.CURRENT).put(
+                            IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_name",
+                            resizeNode.getName()
+                        ).put("index.blocks.write", true)
+                    )
+                    .numberOfShards(2)
+                    .numberOfReplicas(0)
+                    .setRoutingNumShards(16)
+            )
+            .build();
+
+        ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
+            .routingTable(RoutingTable.builder().addAsNew(sourceMetadata.index(sourceIndex)))
+            .metadata(sourceMetadata)
+            .nodes(discoveryNodes)
+            .build();
+
+        {
+            IndexRoutingTable sourceRoutingTable = clusterState.routingTable().index(sourceIndex);
+            assertThat(sourceRoutingTable.size(), equalTo(2));
+            assertThat(sourceRoutingTable.shard(0).primaryShard().state(), equalTo(UNASSIGNED));
+            assertThat(sourceRoutingTable.shard(1).primaryShard().state(), equalTo(UNASSIGNED));
+        }
+
+        final AllocationService allocationService = createAllocationService(
+            Settings.builder()
+                .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 16)
+                .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 16)
+                .build()
+        );
+        clusterState = allocationService.reroute(clusterState, "reroute");
+
+        {
+            IndexRoutingTable sourceRoutingTable = clusterState.routingTable().index(sourceIndex);
+            assertThat(sourceRoutingTable.size(), equalTo(2));
+            assertThat(sourceRoutingTable.shard(0).primaryShard().state(), equalTo(INITIALIZING));
+            assertThat(sourceRoutingTable.shard(1).primaryShard().state(), equalTo(INITIALIZING));
+            assertThat(sourceRoutingTable.shard(0).primaryShard().currentNodeId(), equalTo(resizeNode.getId()));
+            assertThat(sourceRoutingTable.shard(1).primaryShard().currentNodeId(), equalTo(resizeNode.getId()));
+        }
+
+        clusterState = startInitializingShardsAndReroute(allocationService, clusterState);
+
+        {
+            IndexRoutingTable sourceRoutingTable = clusterState.routingTable().index(sourceIndex);
+            assertThat(sourceRoutingTable.size(), equalTo(2));
+            assertThat(sourceRoutingTable.shard(0).primaryShard().state(), equalTo(STARTED));
+            assertThat(sourceRoutingTable.shard(1).primaryShard().state(), equalTo(STARTED));
+            assertThat(sourceRoutingTable.shard(0).primaryShard().currentNodeId(), equalTo(resizeNode.getId()));
+            assertThat(sourceRoutingTable.shard(1).primaryShard().currentNodeId(), equalTo(resizeNode.getId()));
+        }
+
+        final int targetNumShards = randomFrom(1, 2, 4, 8, 16);
+        final int targetNumReplicas = randomInt(2);
+        final Settings.Builder targetSettings = settings(Version.CURRENT);
+        targetSettings.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, targetNumShards);
+        targetSettings.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, targetNumReplicas);
+        targetSettings.put(IndexMetadata.INDEX_RESIZE_SOURCE_NAME.getKey(), sourceIndex);
+        targetSettings.put(IndexMetadata.INDEX_RESIZE_SOURCE_UUID.getKey(), sourceMetadata.index(sourceIndex).getIndexUUID());
+        final boolean isShrink = randomBoolean();
+        if (isShrink) {
+            targetSettings.put(IndexMetadata.INDEX_SHRINK_INITIAL_RECOVERY_KEY, resizeNode.getId());
+        }
+        final boolean hasLifecyclePolicy = randomBoolean();
+        if (hasLifecyclePolicy) {
+            targetSettings.put(IndexMetadata.LIFECYCLE_NAME, "policy");
+        }
+
+        clusterState = ClusterState.builder(clusterState)
+            .metadata(
+                Metadata.builder(clusterState.metadata())
+                    .put(IndexMetadata.builder(targetIndex).settings(targetSettings).setRoutingNumShards(16))
+            )
+            .build();
+        clusterState = ClusterState.builder(clusterState)
+            .routingTable(RoutingTable.builder(clusterState.routingTable()).addAsNew(clusterState.metadata().index(targetIndex)))
+            .build();
+
+        {
+            IndexRoutingTable targetRoutingTable = clusterState.routingTable().index(targetIndex);
+            assertThat(targetRoutingTable.size(), equalTo(targetNumShards));
+            for (int i = 0; i < targetNumShards; i++) {
+                ShardRouting shardRouting = targetRoutingTable.shard(i).primaryShard();
+                assertThat(shardRouting.toString(), shardRouting.state(), equalTo(UNASSIGNED));
+            }
+        }
+
+        clusterState = allocationService.reroute(clusterState, "reroute");
+
+        {
+            IndexMetadata targetIndexMetadata = clusterState.metadata().index(targetIndex);
+            assertThat(IndexMetadata.INDEX_RESIZE_SOURCE_NAME.exists(targetIndexMetadata.getSettings()), is(true));
+            assertThat(IndexMetadata.INDEX_RESIZE_SOURCE_UUID.exists(targetIndexMetadata.getSettings()), is(true));
+            assertThat(targetIndexMetadata.getSettings().hasValue(IndexMetadata.INDEX_SHRINK_INITIAL_RECOVERY_KEY), is(isShrink));
+            IndexRoutingTable targetRoutingTable = clusterState.routingTable().index(targetIndex);
+            assertThat(targetRoutingTable.size(), equalTo(targetNumShards));
+            for (int i = 0; i < targetNumShards; i++) {
+                ShardRouting shardRouting = targetRoutingTable.shard(i).primaryShard();
+                assertThat(shardRouting.toString(), shardRouting.state(), equalTo(INITIALIZING));
+            }
+        }
+
+        while (true) {
+            IndexRoutingTable targetIndexRoutingTable = clusterState.routingTable().index(targetIndex);
+            List<ShardRouting> initializing = targetIndexRoutingTable.shardsWithState(INITIALIZING);
+            if (initializing.isEmpty()) {
+                break;
+            }
+
+            IndexMetadata targetIndexMetadata = clusterState.metadata().index(targetIndex);
+            assertThat(
+                IndexMetadata.INDEX_RESIZE_SOURCE_NAME.exists(targetIndexMetadata.getSettings()),
+                is(hasLifecyclePolicy || (targetIndexRoutingTable.allPrimaryShardsActive() == false))
+            );
+            assertThat(
+                IndexMetadata.INDEX_RESIZE_SOURCE_UUID.exists(targetIndexMetadata.getSettings()),
+                is(targetIndexRoutingTable.allPrimaryShardsActive() == false)
+            );
+            assertThat(
+                targetIndexMetadata.getSettings().hasValue(IndexMetadata.INDEX_SHRINK_INITIAL_RECOVERY_KEY),
+                is(targetIndexRoutingTable.allPrimaryShardsActive() ? false : isShrink)
+            );
+
+            clusterState = startShardsAndReroute(allocationService, clusterState, randomNonEmptySubsetOf(initializing));
+        }
+
+        {
+            IndexMetadata targetIndexMetadata = clusterState.metadata().index(targetIndex);
+            assertThat(IndexMetadata.INDEX_RESIZE_SOURCE_NAME.exists(targetIndexMetadata.getSettings()), is(hasLifecyclePolicy));
+            assertThat(IndexMetadata.INDEX_RESIZE_SOURCE_UUID.exists(targetIndexMetadata.getSettings()), is(false));
+            assertThat(targetIndexMetadata.getSettings().hasValue(IndexMetadata.INDEX_SHRINK_INITIAL_RECOVERY_KEY), is(false));
+            IndexRoutingTable targetRoutingTable = clusterState.routingTable().index(targetIndex);
+            assertThat(targetRoutingTable.size(), equalTo(targetNumShards));
+            for (int i = 0; i < targetNumShards; i++) {
+                ShardRouting shardRouting = targetRoutingTable.shard(i).primaryShard();
+                assertThat(shardRouting.toString(), shardRouting.state(), equalTo(STARTED));
+            }
+        }
+    }
+}