1
0
Эх сурвалжийг харах

InternalSnapshotsInfoService should also removed failed snapshot shard size infos (#63492)

Relates #61906
Tanguy Leroux 5 жил өмнө
parent
commit
b8e5d7c0fb

+ 3 - 2
server/src/main/java/org/elasticsearch/snapshots/InternalSnapshotsInfoService.java

@@ -149,7 +149,7 @@ public class InternalSnapshotsInfoService implements ClusterStateListener, Snaps
                     }
                 }
                 // Clean up keys from knownSnapshotShardSizes that are no longer needed for recoveries
-                cleanUpKnownSnapshotShardSizes(onGoingSnapshotRecoveries);
+                cleanUpSnapshotShardSizes(onGoingSnapshotRecoveries);
             }
 
             final int nbFetchers = Math.min(unknownShards, maxConcurrentFetches);
@@ -263,7 +263,7 @@ public class InternalSnapshotsInfoService implements ClusterStateListener, Snaps
         }
     }
 
-    private void cleanUpKnownSnapshotShardSizes(Set<SnapshotShard> requiredSnapshotShards) {
+    private void cleanUpSnapshotShardSizes(Set<SnapshotShard> requiredSnapshotShards) {
         assert Thread.holdsLock(mutex);
         ImmutableOpenMap.Builder<SnapshotShard, Long> newSnapshotShardSizes = null;
         for (ObjectCursor<SnapshotShard> shard : knownSnapshotShardSizes.keys()) {
@@ -277,6 +277,7 @@ public class InternalSnapshotsInfoService implements ClusterStateListener, Snaps
         if (newSnapshotShardSizes != null) {
             knownSnapshotShardSizes = newSnapshotShardSizes.build();
         }
+        failedSnapshotShards.retainAll(requiredSnapshotShards);
     }
 
     private boolean invariant() {

+ 88 - 22
server/src/test/java/org/elasticsearch/snapshots/InternalSnapshotsInfoServiceTests.java

@@ -19,25 +19,29 @@
 
 package org.elasticsearch.snapshots;
 
+import com.carrotsearch.hppc.IntHashSet;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ESAllocationTestCase;
+import org.elasticsearch.cluster.RestoreInProgress;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.AllocationId;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
-import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
-import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterApplier;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
@@ -68,6 +72,8 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_CREATION_
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
+import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
+import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING;
 import static org.elasticsearch.snapshots.InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.equalTo;
@@ -275,6 +281,54 @@ public class InternalSnapshotsInfoServiceTests extends ESTestCase {
         });
     }
 
+    public void testCleanUpSnapshotShardSizes() throws Exception {
+        final Repository mockRepository = new FilterRepository(mock(Repository.class)) {
+            @Override
+            public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
+                if (randomBoolean()) {
+                    throw new SnapshotException(new Snapshot("_repo", snapshotId), "simulated");
+                } else {
+                    return IndexShardSnapshotStatus.newDone(0L, 0L, 0, 0, 0L, randomNonNegativeLong(), null);
+                }
+            }
+        };
+        when(repositoriesService.repository("_repo")).thenReturn(mockRepository);
+
+        final InternalSnapshotsInfoService snapshotsInfoService =
+            new InternalSnapshotsInfoService(Settings.EMPTY, clusterService, () -> repositoriesService, () -> rerouteService);
+
+        final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        final int nbShards = randomIntBetween(1, 10);
+
+        applyClusterState("new snapshot restore for index " + indexName,
+            clusterState -> addUnassignedShards(clusterState, indexName, nbShards));
+
+        // waiting for snapshot shard size fetches to be executed, as we want to verify that they are cleaned up
+        assertBusy(() -> assertThat(
+            snapshotsInfoService.numberOfFailedSnapshotShardSizes() + snapshotsInfoService.numberOfKnownSnapshotShardSizes(),
+            equalTo(nbShards)));
+
+        if (randomBoolean()) {
+            // simulate initialization and start of the shards
+            final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.builder()
+                .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), nbShards)
+                .put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), nbShards)
+                .build(), snapshotsInfoService);
+            applyClusterState("starting shards for " + indexName, clusterState ->
+                    ESAllocationTestCase.startInitializingShardsAndReroute(allocationService, clusterState, indexName));
+            assertTrue(clusterService.state().routingTable().shardsWithState(ShardRoutingState.UNASSIGNED).isEmpty());
+
+        } else {
+            // simulate deletion of the index
+            applyClusterState("delete index " + indexName, clusterState -> deleteIndex(clusterState, indexName));
+            assertFalse(clusterService.state().metadata().hasIndex(indexName));
+        }
+
+        assertThat(snapshotsInfoService.numberOfKnownSnapshotShardSizes(), equalTo(0));
+        assertThat(snapshotsInfoService.numberOfUnknownSnapshotShardSizes(), equalTo(0));
+        assertThat(snapshotsInfoService.numberOfFailedSnapshotShardSizes(), equalTo(0));
+    }
+
     private void applyClusterState(final String reason, final Function<ClusterState, ClusterState> applier) {
         PlainActionFuture.get(future -> clusterService.getClusterApplierService().onNewClusterState(reason,
             () -> applier.apply(clusterService.state()),
@@ -309,14 +363,19 @@ public class InternalSnapshotsInfoServiceTests extends ESTestCase {
     private ClusterState addUnassignedShards(final ClusterState currentState, String indexName, int numberOfShards) {
         assertThat(currentState.metadata().hasIndex(indexName), is(false));
 
+        final IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
+            .settings(Settings.builder()
+                .put(SETTING_VERSION_CREATED, Version.CURRENT)
+                .put(SETTING_NUMBER_OF_SHARDS, numberOfShards)
+                .put(SETTING_NUMBER_OF_REPLICAS, 0)
+                .put(SETTING_CREATION_DATE, System.currentTimeMillis()));
+
+        for (int i = 0; i < numberOfShards; i++) {
+            indexMetadataBuilder.putInSyncAllocationIds(i, Collections.singleton(AllocationId.newInitializing().getId()));
+        }
+
         final Metadata.Builder metadata = Metadata.builder(currentState.metadata())
-            .put(IndexMetadata.builder(indexName)
-                .settings(Settings.builder()
-                    .put(SETTING_VERSION_CREATED, Version.CURRENT)
-                    .put(SETTING_NUMBER_OF_SHARDS, numberOfShards)
-                    .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1))
-                    .put(SETTING_CREATION_DATE, System.currentTimeMillis()))
-                .build(), true)
+            .put(indexMetadataBuilder.build(), true)
             .generateClusterUuidIfNeeded();
 
         final RecoverySource.SnapshotRecoverySource recoverySource = new RecoverySource.SnapshotRecoverySource(
@@ -326,24 +385,24 @@ public class InternalSnapshotsInfoServiceTests extends ESTestCase {
             new IndexId(indexName, UUIDs.randomBase64UUID(random()))
         );
 
-        final Index index = metadata.get(indexName).getIndex();
-        final IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index);
-        for (int primary = 0; primary < numberOfShards; primary++) {
-            final ShardId shardId = new ShardId(index, primary);
+        final IndexMetadata indexMetadata = metadata.get(indexName);
+        final Index index = indexMetadata.getIndex();
 
-            final IndexShardRoutingTable.Builder indexShards = new IndexShardRoutingTable.Builder(shardId);
-            indexShards.addShard(TestShardRouting.newShardRouting(shardId, null, true, ShardRoutingState.UNASSIGNED, recoverySource));
-            for (int replica = 0; replica < metadata.get(indexName).getNumberOfReplicas(); replica++) {
-                indexShards.addShard(TestShardRouting.newShardRouting(shardId, null, false, ShardRoutingState.UNASSIGNED,
-                    RecoverySource.PeerRecoverySource.INSTANCE));
-            }
-            indexRoutingTable.addIndexShard(indexShards.build());
+        final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
+        routingTable.add(IndexRoutingTable.builder(index).initializeAsNewRestore(indexMetadata, recoverySource, new IntHashSet()).build());
+
+        final RestoreInProgress.Builder restores =
+            new RestoreInProgress.Builder(currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY));
+        final ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shards = ImmutableOpenMap.builder();
+        for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
+            shards.put( new ShardId(index, i), new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId()));
         }
 
-        final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
-        routingTable.add(indexRoutingTable.build());
+        restores.add(new RestoreInProgress.Entry(recoverySource.restoreUUID(), recoverySource.snapshot(), RestoreInProgress.State.INIT,
+            Collections.singletonList(indexName), shards.build()));
 
         return ClusterState.builder(currentState)
+            .putCustom(RestoreInProgress.TYPE, restores.build())
             .routingTable(routingTable.build())
             .metadata(metadata)
             .build();
@@ -360,4 +419,11 @@ public class InternalSnapshotsInfoServiceTests extends ESTestCase {
                 .masterNodeId(node.getId()))
             .build();
     }
+
+    private ClusterState deleteIndex(final ClusterState currentState, final String indexName) {
+        return ClusterState.builder(currentState)
+            .metadata(Metadata.builder(currentState.metadata()).remove(indexName))
+            .routingTable(RoutingTable.builder(currentState.routingTable()).remove(indexName).build())
+            .build();
+    }
 }