瀏覽代碼

Fix Two Snapshot Clone State Machine Bugs (#65042)

There are two separate but closely related bug fixes in this PR:
1. When two snapshot clones would initialize concurrently we could get into a state
where one is in front of the other in the queue snapshots array but its shard states
are in fact queued behind the other snapshot or clone. Tightly linked to this,
snapshot cloning would not account for snapshot deletes when queueing shard snapshots
which could lead to races where both delete and clone are running concurrently for a shard.
2. As a result of fixing the first issue and writing a test for it, it also became obvious
that a finished delete was not properly accounted for when it comes to starting snapshot
clones that could now queue behind a delete.
Armin Braun 4 年之前
父節點
當前提交
12d412ebad

+ 42 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.snapshots;
 
+import com.carrotsearch.hppc.cursors.ObjectCursor;
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus;
@@ -313,6 +314,10 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
 
         final int extraClones = randomIntBetween(1, 5);
         final List<ActionFuture<AcknowledgedResponse>> extraCloneFutures = new ArrayList<>(extraClones);
+        final boolean slowInitClones = extraClones > 1 && randomBoolean();
+        if (slowInitClones) {
+            blockMasterOnReadIndexMeta(repoName);
+        }
         for (int i = 0; i < extraClones; i++) {
             extraCloneFutures.add(startClone(repoName, sourceSnapshot, "target-snapshot-" + i, indexBlocked));
         }
@@ -571,6 +576,43 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
         assertEquals(getSnapshot(repoName, cloneName).state(), SnapshotState.SUCCESS);
     }
 
+    public void testStartCloneDuringRunningDelete() throws Exception {
+        final String masterName = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
+        internalCluster().startDataOnlyNode();
+        final String repoName = "test-repo";
+        createRepository(repoName, "mock");
+
+        final String indexName = "test-idx";
+        createIndexWithContent(indexName);
+
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repoName, sourceSnapshot);
+
+        final List<String> snapshotNames = createNSnapshots(repoName, randomIntBetween(1, 5));
+        blockMasterOnWriteIndexFile(repoName);
+        final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, randomFrom(snapshotNames));
+        waitForBlock(masterName, repoName);
+        awaitNDeletionsInProgress(1);
+
+        final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, "target-snapshot", indexName);
+        logger.info("--> waiting for snapshot clone to be fully initialized");
+        awaitClusterState(state -> {
+            for (SnapshotsInProgress.Entry entry : state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
+                if (entry.clones().isEmpty() == false) {
+                    assertEquals(sourceSnapshot, entry.source().getName());
+                    for (ObjectCursor<SnapshotsInProgress.ShardSnapshotStatus> value : entry.clones().values()) {
+                        assertSame(value.value, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED);
+                    }
+                    return true;
+                }
+            }
+            return  false;
+        });
+        unblockNode(repoName, masterName);
+        assertAcked(deleteFuture.get());
+        assertAcked(cloneFuture.get());
+    }
+
     private ActionFuture<AcknowledgedResponse> startCloneFromDataNode(String repoName, String sourceSnapshot, String targetSnapshot,
                                                                       String... indices) {
         return startClone(dataNodeClient(), repoName, sourceSnapshot, targetSnapshot, indices);

+ 0 - 6
server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java

@@ -1313,12 +1313,6 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
         Files.write(indexNBlob, randomByteArrayOfLength(1), StandardOpenOption.TRUNCATE_EXISTING);
     }
 
-    private void awaitNDeletionsInProgress(int count) throws Exception {
-        logger.info("--> wait for [{}] deletions to show up in the cluster state", count);
-        awaitClusterState(state ->
-                state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).getEntries().size() == count);
-    }
-
     private static List<SnapshotInfo> currentSnapshots(String repoName) {
         return client().admin().cluster().prepareGetSnapshots(repoName).setSnapshots(GetSnapshotsRequest.CURRENT_SNAPSHOT)
                 .get().getSnapshots(repoName);

+ 47 - 5
server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

@@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState.Custom;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -471,6 +472,25 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
                 }
             }
             builder.endArray();
+            if (isClone()) {
+                builder.field(SOURCE, source);
+                builder.startArray(CLONES);
+                {
+                    for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> shardEntry : clones) {
+                        RepositoryShardId shardId = shardEntry.key;
+                        ShardSnapshotStatus status = shardEntry.value;
+                        builder.startObject();
+                        {
+                            builder.field(INDEX, shardId.index());
+                            builder.field(SHARD, shardId.shardId());
+                            builder.field(STATE, status.state());
+                            builder.field(NODE, status.nodeId());
+                        }
+                        builder.endObject();
+                    }
+                }
+                builder.endArray();
+            }
             builder.array(DATA_STREAMS, dataStreams.toArray(new String[0]));
             builder.endObject();
             return builder;
@@ -692,13 +712,18 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
     private final List<Entry> entries;
 
     private static boolean assertConsistentEntries(List<Entry> entries) {
-        final Map<String, Set<ShardId>> assignedShardsByRepo = new HashMap<>();
+        final Map<String, Set<Tuple<String, Integer>>> assignedShardsByRepo = new HashMap<>();
+        final Map<String, Set<Tuple<String, Integer>>> queuedShardsByRepo = new HashMap<>();
         for (Entry entry : entries) {
             for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : entry.shards()) {
-                if (shard.value.isActive()) {
-                    assert assignedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>()).add(shard.key) :
-                            "Found duplicate shard assignments in " + entries;
-                }
+                final ShardId sid = shard.key;
+                assert assertShardStateConsistent(entries, assignedShardsByRepo, queuedShardsByRepo, entry, sid.getIndexName(), sid.id(),
+                        shard.value);
+            }
+            for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> shard : entry.clones()) {
+                final RepositoryShardId sid = shard.key;
+                assert assertShardStateConsistent(entries, assignedShardsByRepo, queuedShardsByRepo, entry, sid.indexName(), sid.shardId(),
+                        shard.value);
             }
         }
         for (String repoName : assignedShardsByRepo.keySet()) {
@@ -708,6 +733,21 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
         return true;
     }
 
+    private static boolean assertShardStateConsistent(List<Entry> entries, Map<String, Set<Tuple<String, Integer>>> assignedShardsByRepo,
+                                                      Map<String, Set<Tuple<String, Integer>>> queuedShardsByRepo, Entry entry,
+                                                      String indexName, int shardId, ShardSnapshotStatus shardSnapshotStatus) {
+        if (shardSnapshotStatus.isActive()) {
+            Tuple<String, Integer> plainShardId = Tuple.tuple(indexName, shardId);
+            assert assignedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>())
+                    .add(plainShardId) : "Found duplicate shard assignments in " + entries;
+            assert queuedShardsByRepo.getOrDefault(entry.repository(), Collections.emptySet()).contains(plainShardId) == false
+                    : "Found active shard assignments after queued shard assignments in " + entries;
+        } else if (shardSnapshotStatus.state() == ShardState.QUEUED) {
+            queuedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>()).add(Tuple.tuple(indexName, shardId));
+        }
+        return true;
+    }
+
     public static SnapshotsInProgress of(List<Entry> entries) {
         if (entries.isEmpty()) {
             return EMPTY;
@@ -766,6 +806,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
     private static final String STATE = "state";
     private static final String INDICES = "indices";
     private static final String DATA_STREAMS = "data_streams";
+    private static final String SOURCE = "source";
+    private static final String CLONES = "clones";
     private static final String START_TIME_MILLIS = "start_time_millis";
     private static final String START_TIME = "start_time";
     private static final String REPOSITORY_STATE_ID = "repository_state_id";

+ 115 - 36
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -476,17 +476,32 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 final String repoName = cloneEntry.repository();
                 final ShardGenerations shardGenerations = repoData.shardGenerations();
                 for (int i = 0; i < updatedEntries.size(); i++) {
-                    if (cloneEntry.snapshot().equals(updatedEntries.get(i).snapshot())) {
+                    final SnapshotsInProgress.Entry entry = updatedEntries.get(i);
+                    if (cloneEntry.repository().equals(entry.repository()) == false) {
+                        // different repo => just continue without modification
+                        continue;
+                    }
+                    if (cloneEntry.snapshot().getSnapshotId().equals(entry.snapshot().getSnapshotId())) {
                         final ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clonesBuilder =
                                 ImmutableOpenMap.builder();
-                        final InFlightShardSnapshotStates inFlightShardStates =
-                            InFlightShardSnapshotStates.forRepo(repoName, snapshotsInProgress.entries());
+                        final boolean readyToExecute = currentState.custom(
+                                SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).getEntries().stream()
+                                .noneMatch(e -> e.repository().equals(repoName) && e.state() == SnapshotDeletionsInProgress.State.STARTED);
+                        final InFlightShardSnapshotStates inFlightShardStates;
+                        if (readyToExecute) {
+                            inFlightShardStates = InFlightShardSnapshotStates.forRepo(repoName, snapshotsInProgress.entries());
+                        } else {
+                            // no need to compute these, we'll mark all shards as queued anyway because we wait for the delete
+                            inFlightShardStates = null;
+                        }
+                        boolean queuedShards = false;
                         for (Tuple<IndexId, Integer> count : counts) {
                             for (int shardId = 0; shardId < count.v2(); shardId++) {
                                 final RepositoryShardId repoShardId = new RepositoryShardId(count.v1(), shardId);
                                 final String indexName = repoShardId.indexName();
-                                if (inFlightShardStates.isActive(indexName, shardId)) {
+                                if (readyToExecute == false || inFlightShardStates.isActive(indexName, shardId)) {
                                     clonesBuilder.put(repoShardId, ShardSnapshotStatus.UNASSIGNED_QUEUED);
+                                    queuedShards = true;
                                 } else {
                                     clonesBuilder.put(repoShardId, new ShardSnapshotStatus(localNodeId,
                                         inFlightShardStates.generationForShard(repoShardId.index(), shardId, shardGenerations)));
@@ -494,7 +509,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                             }
                         }
                         updatedEntry = cloneEntry.withClones(clonesBuilder.build());
-                        updatedEntries.set(i, updatedEntry);
+                        if (queuedShards) {
+                            // We queued up some shards based on the in-flight operations found in all snapshots for the current
+                            // repository, so in order to make sure we don't set a shard to QUEUED before (as in before it in the
+                            // `updatedEntries` list) one that is actively executing we just put it to the back of the list as if we had
+                            // just created the entry
+                            // TODO: If we could eventually drop the snapshot clone init phase we don't need this any longer
+                            updatedEntries.remove(i);
+                            updatedEntries.add(updatedEntry);
+                        } else {
+                            updatedEntries.set(i, updatedEntry);
+                        }
                         changed = true;
                         break;
                     }
@@ -797,11 +822,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         final Set<String> reposSeen = new HashSet<>();
         for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
             if (reposSeen.add(entry.repository())) {
-                for (ObjectCursor<ShardSnapshotStatus> value : entry.shards().values()) {
+                for (ObjectCursor<ShardSnapshotStatus> value : (entry.isClone() ? entry.clones() : entry.shards()).values()) {
                     if (value.value.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)) {
                         assert reposWithRunningDelete.contains(entry.repository())
                                 : "Found shard snapshot waiting to be assigned in [" + entry +
                                 "] but it is not blocked by any running delete";
+                    } else if (value.value.isActive()) {
+                        assert reposWithRunningDelete.contains(entry.repository()) == false
+                                : "Found shard snapshot actively executing in [" + entry +
+                                "] when it should be blocked by a running delete";
                     }
                 }
             }
@@ -1894,6 +1923,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     endSnapshot(entry, newState.metadata(), repositoryData);
                 }
             }
+            // TODO: be more efficient here, we could collect newly ready shard clones as we compute them and then directly start them
+            //       instead of looping over all possible clones to execute
+            startExecutableClones(newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY), null);
         }
 
         /**
@@ -1929,50 +1961,88 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 
             // Keep track of shardIds that we started snapshots for as a result of removing this delete so we don't assign
             // them to multiple snapshots by accident
-            final Set<ShardId> reassignedShardIds = new HashSet<>();
+            final Map<String, Set<Integer>> reassignedShardIds = new HashMap<>();
 
             boolean changed = false;
 
+            final String localNodeId = currentState.nodes().getLocalNodeId();
             final String repoName = deleteEntry.repository();
             // Computing the new assignments can be quite costly, only do it once below if actually needed
             ImmutableOpenMap<ShardId, ShardSnapshotStatus> shardAssignments = null;
+            InFlightShardSnapshotStates inFlightShardStates = null;
             for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
                 if (entry.repository().equals(repoName)) {
                     if (entry.state().completed() == false) {
-                        // Collect waiting shards that in entry that we can assign now that we are done with the deletion
-                        final List<ShardId> canBeUpdated = new ArrayList<>();
-                        for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> value : entry.shards()) {
-                            if (value.value.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)
-                                    && reassignedShardIds.contains(value.key) == false) {
-                                canBeUpdated.add(value.key);
+                        // TODO: dry up redundant computation and code between clone and non-clone case, in particular reuse
+                        //  `inFlightShardStates` across both clone and standard snapshot code
+                        if (entry.isClone()) {
+                            // Collect waiting shards from that entry that we can assign now that we are done with the deletion
+                            final List<RepositoryShardId> canBeUpdated = new ArrayList<>();
+                            for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> value : entry.clones()) {
+                                if (value.value.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)
+                                        && alreadyReassigned(value.key.indexName(), value.key.shardId(), reassignedShardIds) == false) {
+                                    canBeUpdated.add(value.key);
+                                }
+                            }
+                            // TODO: the below logic is very similar to that in #startCloning and both could be dried up against each other
+                            //       also the code for standard snapshots could make use of this breakout as well
+                            if (canBeUpdated.isEmpty() || updatedDeletions.getEntries().stream().anyMatch(
+                                    e -> e.repository().equals(repoName) && e.state() == SnapshotDeletionsInProgress.State.STARTED)) {
+                                // No shards can be updated in this snapshot so we just add it as is again
+                                snapshotEntries.add(entry);
+                            } else {
+                                if (inFlightShardStates == null) {
+                                    inFlightShardStates = InFlightShardSnapshotStates.forRepo(repoName, snapshotsInProgress.entries());
+                                }
+                                final ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> updatedAssignmentsBuilder =
+                                        ImmutableOpenMap.builder(entry.clones());
+                                for (RepositoryShardId shardId : canBeUpdated) {
+                                    if (inFlightShardStates.isActive(shardId.indexName(), shardId.shardId()) == false) {
+                                        markShardReassigned(shardId.indexName(), shardId.shardId(), reassignedShardIds);
+                                        updatedAssignmentsBuilder.put(shardId,
+                                                new ShardSnapshotStatus(localNodeId,
+                                                        inFlightShardStates.generationForShard(
+                                                                shardId.index(), shardId.shardId(), repositoryData.shardGenerations())));
+                                    }
+                                }
+                                snapshotEntries.add(entry.withClones(updatedAssignmentsBuilder.build()));
+                                changed = true;
                             }
-                        }
-                        if (canBeUpdated.isEmpty()) {
-                            // No shards can be updated in this snapshot so we just add it as is again
-                            snapshotEntries.add(entry);
                         } else {
-                            if (shardAssignments == null) {
-                                shardAssignments = shards(snapshotsInProgress,
-                                        updatedDeletions, currentState.metadata(), currentState.routingTable(), entry.indices(),
-                                        entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), repositoryData, repoName);
+                            // Collect waiting shards that in entry that we can assign now that we are done with the deletion
+                            final List<ShardId> canBeUpdated = new ArrayList<>();
+                            for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> value : entry.shards()) {
+                                if (value.value.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)
+                                        && alreadyReassigned(value.key.getIndexName(), value.key.getId(), reassignedShardIds) == false) {
+                                    canBeUpdated.add(value.key);
+                                }
                             }
-                            final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> updatedAssignmentsBuilder =
-                                    ImmutableOpenMap.builder(entry.shards());
-                            for (ShardId shardId : canBeUpdated) {
-                                final ShardSnapshotStatus updated = shardAssignments.get(shardId);
-                                if (updated == null) {
-                                    // We don't have a new assignment for this shard because its index was concurrently deleted
-                                    assert currentState.routingTable().hasIndex(shardId.getIndex()) == false :
-                                            "Missing assignment for [" + shardId + "]";
-                                    updatedAssignmentsBuilder.put(shardId, ShardSnapshotStatus.MISSING);
-                                } else {
-                                    final boolean added = reassignedShardIds.add(shardId);
-                                    assert added;
-                                    updatedAssignmentsBuilder.put(shardId, updated);
+                            if (canBeUpdated.isEmpty()) {
+                                // No shards can be updated in this snapshot so we just add it as is again
+                                snapshotEntries.add(entry);
+                            } else {
+                                if (shardAssignments == null) {
+                                    shardAssignments = shards(snapshotsInProgress,
+                                            updatedDeletions, currentState.metadata(), currentState.routingTable(), entry.indices(),
+                                            entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), repositoryData, repoName);
+                                }
+                                final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> updatedAssignmentsBuilder =
+                                        ImmutableOpenMap.builder(entry.shards());
+                                for (ShardId shardId : canBeUpdated) {
+                                    final ShardSnapshotStatus updated = shardAssignments.get(shardId);
+                                    if (updated == null) {
+                                        // We don't have a new assignment for this shard because its index was concurrently deleted
+                                        assert currentState.routingTable().hasIndex(shardId.getIndex()) == false :
+                                                "Missing assignment for [" + shardId + "]";
+                                        updatedAssignmentsBuilder.put(shardId, ShardSnapshotStatus.MISSING);
+                                    } else {
+                                        markShardReassigned(shardId.getIndexName(), shardId.id(), reassignedShardIds);
+                                        updatedAssignmentsBuilder.put(shardId, updated);
+                                    }
                                 }
+                                snapshotEntries.add(entry.withStartedShards(updatedAssignmentsBuilder.build()));
+                                changed = true;
                             }
-                            snapshotEntries.add(entry.withStartedShards(updatedAssignmentsBuilder.build()));
-                            changed = true;
                         }
                     } else {
                         // Entry is already completed so we will finalize it now that the delete doesn't block us after
@@ -1987,6 +2057,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             }
             return changed ? SnapshotsInProgress.of(snapshotEntries) : null;
         }
+
+        private void markShardReassigned(String indexName, int shardId, Map<String, Set<Integer>> reassignments) {
+            final boolean added = reassignments.computeIfAbsent(indexName, k -> new HashSet<>()).add(shardId);
+            assert added : "should only ever reassign each shard once but assigned [" + indexName + "][" + shardId + "] multiple times";
+        }
+
+        private boolean alreadyReassigned(String indexName, int shardId, Map<String, Set<Integer>> reassignments) {
+            return reassignments.getOrDefault(indexName, Collections.emptySet()).contains(shardId);
+        }
     }
 
     /**

+ 6 - 0
test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -421,6 +421,12 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
                 SnapshotsService.OLD_SNAPSHOT_FORMAT, Function.identity(), f));
     }
 
+    protected void awaitNDeletionsInProgress(int count) throws Exception {
+        logger.info("--> wait for [{}] deletions to show up in the cluster state", count);
+        awaitClusterState(state ->
+                state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).getEntries().size() == count);
+    }
+
     protected void awaitNoMoreRunningOperations() throws Exception {
         awaitNoMoreRunningOperations(internalCluster().getMasterName());
     }