Przeglądaj źródła

Fix Snapshot Out of Order Finalization Repo Corruption (#75362)

* Fix up shard generations in `SnapshotsInProgress` during snapshot finalization (don't do it earlier because it's a really heavy computation and we have a ton of places where it would have to run).
* Adjust finalization queue to be able to work with changing snapshot entries after they've been enqueued for finalisation
* Still one remaining bug left after this (see TODO about leaking generations) that I don't feel confident in fixing for `7.13.4` due to the complexity of a fix and how minor the blob leak is (+ it's cleaned up just fine during snapshot deletes)

Closes #75336
Armin Braun 4 lat temu
rodzic
commit
3bd2672813

+ 163 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java

@@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
 import org.elasticsearch.discovery.AbstractDisruptionTestCase;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.ShardGenerations;
@@ -1446,6 +1447,168 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
         awaitNoMoreRunningOperations();
     }
 
+    public void testOutOfOrderFinalization() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
+        final String index1 = "index-1";
+        final String index2 = "index-2";
+        createIndexWithContent(index1, dataNodes.get(0), dataNodes.get(1));
+        createIndexWithContent(index2, dataNodes.get(1), dataNodes.get(0));
+
+        final String repository = "test-repo";
+        createRepository(repository, "mock");
+
+        blockNodeWithIndex(repository, index2);
+
+        final ActionFuture<CreateSnapshotResponse> snapshot1 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-1")
+            .setIndices(index1, index2)
+            .setWaitForCompletion(true)
+            .execute();
+        awaitNumberOfSnapshotsInProgress(1);
+        final ActionFuture<CreateSnapshotResponse> snapshot2 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-2")
+            .setIndices(index1)
+            .setWaitForCompletion(true)
+            .execute();
+        assertSuccessful(snapshot2);
+        unblockAllDataNodes(repository);
+        final SnapshotInfo sn1 = assertSuccessful(snapshot1);
+
+        assertAcked(startDeleteSnapshot(repository, sn1.snapshot().getSnapshotId().getName()).get());
+
+        assertThat(
+            clusterAdmin().prepareSnapshotStatus().setSnapshots("snapshot-2").setRepository(repository).get().getSnapshots(),
+            hasSize(1)
+        );
+    }
+
+    public void testOutOfOrderAndConcurrentFinalization() throws Exception {
+        final String master = internalCluster().startMasterOnlyNode();
+        final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
+        final String index1 = "index-1";
+        final String index2 = "index-2";
+        createIndexWithContent(index1, dataNodes.get(0), dataNodes.get(1));
+        createIndexWithContent(index2, dataNodes.get(1), dataNodes.get(0));
+
+        final String repository = "test-repo";
+        createRepository(repository, "mock");
+
+        blockNodeWithIndex(repository, index2);
+
+        final ActionFuture<CreateSnapshotResponse> snapshot1 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-1")
+            .setIndices(index1, index2)
+            .setWaitForCompletion(true)
+            .execute();
+        awaitNumberOfSnapshotsInProgress(1);
+
+        blockMasterOnWriteIndexFile(repository);
+        final ActionFuture<CreateSnapshotResponse> snapshot2 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-2")
+            .setIndices(index1)
+            .setWaitForCompletion(true)
+            .execute();
+
+        awaitClusterState(state -> {
+            final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
+            return snapshotsInProgress.entries().size() == 2 && snapshotsInProgress.entries().get(1).state().completed();
+        });
+
+        unblockAllDataNodes(repository);
+        awaitClusterState(state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().get(0).state().completed());
+
+        unblockNode(repository, master);
+        assertSuccessful(snapshot2);
+
+        final SnapshotInfo sn1 = assertSuccessful(snapshot1);
+        assertAcked(startDeleteSnapshot(repository, sn1.snapshot().getSnapshotId().getName()).get());
+
+        assertThat(
+            clusterAdmin().prepareSnapshotStatus().setSnapshots("snapshot-2").setRepository(repository).get().getSnapshots(),
+            hasSize(1)
+        );
+    }
+
+    public void testOutOfOrderFinalizationWithConcurrentClone() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
+        final String index1 = "index-1";
+        final String index2 = "index-2";
+        createIndexWithContent(index1, dataNodes.get(0), dataNodes.get(1));
+        createIndexWithContent(index2, dataNodes.get(1), dataNodes.get(0));
+
+        final String repository = "test-repo";
+        createRepository(repository, "mock");
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repository, sourceSnapshot);
+        indexDoc(index2, "doc_id", "foo", "bar");
+
+        blockNodeWithIndex(repository, index2);
+
+        final String sn1 = "snapshot-1";
+        final ActionFuture<CreateSnapshotResponse> snapshot1 = clusterAdmin().prepareCreateSnapshot(repository, sn1)
+            .setIndices(index1, index2)
+            .setWaitForCompletion(true)
+            .execute();
+        awaitNumberOfSnapshotsInProgress(1);
+
+        final String targetSnapshot = "target-snapshot";
+        final ActionFuture<AcknowledgedResponse> clone = clusterAdmin().prepareCloneSnapshot(repository, sourceSnapshot, targetSnapshot)
+            .setIndices(index1)
+            .execute();
+        assertAcked(clone.get());
+
+        unblockAllDataNodes(repository);
+        assertSuccessful(snapshot1);
+
+        logger.info("--> deleting snapshots [{},{}] from repo [{}]", sn1, sourceSnapshot, repository);
+        assertAcked(clusterAdmin().prepareDeleteSnapshot(repository).setSnapshots(sn1, sourceSnapshot).get());
+
+        assertThat(
+            clusterAdmin().prepareSnapshotStatus().setSnapshots(targetSnapshot).setRepository(repository).get().getSnapshots(),
+            hasSize(1)
+        );
+    }
+
+    public void testOutOfOrderCloneFinalization() throws Exception {
+        final String master = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
+        internalCluster().startDataOnlyNode();
+        final String index1 = "index-1";
+        final String index2 = "index-2";
+        createIndexWithContent(index1);
+        createIndexWithContent(index2);
+
+        final String repository = "test-repo";
+        createRepository(repository, "mock");
+
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repository, sourceSnapshot);
+
+        final IndexId index1Id = getRepositoryData(repository).resolveIndexId(index1);
+        blockMasterOnShardLevelSnapshotFile(repository, index1Id.getId());
+
+        final String cloneTarget = "target-snapshot";
+        final ActionFuture<AcknowledgedResponse> cloneSnapshot = clusterAdmin().prepareCloneSnapshot(
+            repository,
+            sourceSnapshot,
+            cloneTarget
+        ).setIndices(index1, index2).execute();
+        awaitNumberOfSnapshotsInProgress(1);
+        waitForBlock(master, repository);
+
+        final ActionFuture<CreateSnapshotResponse> snapshot2 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-2")
+            .setIndices(index2)
+            .setWaitForCompletion(true)
+            .execute();
+        assertSuccessful(snapshot2);
+
+        unblockNode(repository, master);
+        assertAcked(cloneSnapshot.get());
+        assertAcked(startDeleteSnapshot(repository, cloneTarget).get());
+
+        assertThat(
+            clusterAdmin().prepareSnapshotStatus().setSnapshots("snapshot-2").setRepository(repository).get().getSnapshots(),
+            hasSize(1)
+        );
+    }
+
     private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) {
         final SnapshotsStatusResponse snapshotsStatusResponse = client().admin()
             .cluster()

+ 7 - 0
server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

@@ -434,6 +434,13 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             return reason;
         }
 
+        public ShardSnapshotStatus withUpdatedGeneration(String newGeneration) {
+            assert state == ShardState.SUCCESS : "can't move generation in state " + state;
+            return new ShardSnapshotStatus(nodeId, state, reason, newGeneration,
+                    shardSnapshotResult == null ? null :
+                            new ShardSnapshotResult(newGeneration, shardSnapshotResult.getSize(), shardSnapshotResult.getSegmentCount()));
+        }
+
         @Nullable
         public ShardSnapshotResult shardSnapshotResult() {
             assert state == ShardState.SUCCESS : "result is unavailable in state " + state;

+ 200 - 31
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -1347,19 +1347,20 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * @param entry snapshot
      */
     private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nullable RepositoryData repositoryData) {
+        final Snapshot snapshot = entry.snapshot();
         if (entry.isClone() && entry.state() == State.FAILED) {
             logger.debug("Removing failed snapshot clone [{}] from cluster state", entry);
-            removeFailedSnapshotFromClusterState(entry.snapshot(), new SnapshotException(entry.snapshot(), entry.failure()), null);
+            removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null);
             return;
         }
-        final boolean newFinalization = endingSnapshots.add(entry.snapshot());
-        final String repoName = entry.repository();
+        final boolean newFinalization = endingSnapshots.add(snapshot);
+        final String repoName = snapshot.getRepository();
         if (tryEnterRepoLoop(repoName)) {
             if (repositoryData == null) {
                 repositoriesService.repository(repoName).getRepositoryData(new ActionListener<>() {
                     @Override
                     public void onResponse(RepositoryData repositoryData) {
-                        finalizeSnapshotEntry(entry, metadata, repositoryData);
+                        finalizeSnapshotEntry(snapshot, metadata, repositoryData);
                     }
 
                     @Override
@@ -1371,11 +1372,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     }
                 });
             } else {
-                finalizeSnapshotEntry(entry, metadata, repositoryData);
+                finalizeSnapshotEntry(snapshot, metadata, repositoryData);
             }
         } else {
             if (newFinalization) {
-                repositoryOperations.addFinalization(entry, metadata);
+                repositoryOperations.addFinalization(snapshot, metadata);
             }
         }
     }
@@ -1401,11 +1402,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         assert removed;
     }
 
-    private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata metadata, RepositoryData repositoryData) {
-        assert currentlyFinalizing.contains(entry.repository());
+    private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) {
+        assert currentlyFinalizing.contains(snapshot.getRepository());
         try {
+            SnapshotsInProgress.Entry entry = clusterService.state()
+                .custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
+                .snapshot(snapshot);
             final String failure = entry.failure();
-            final Snapshot snapshot = entry.snapshot();
             logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
             final ShardGenerations shardGenerations = buildGenerations(entry, metadata);
             final List<String> finalIndices = shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList());
@@ -1515,17 +1518,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     metaForSnapshot,
                     snapshotInfo,
                     entry.version(),
-                    state -> stateWithoutSnapshot(state, snapshot),
+                    state -> stateWithoutSuccessfulSnapshot(state, snapshot),
                     ActionListener.wrap(newRepoData -> {
                         completeListenersIgnoringException(endAndGetListenersToResolve(snapshot), Tuple.tuple(newRepoData, snapshotInfo));
                         logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
                         runNextQueuedOperation(newRepoData, repository, true);
-                    }, e -> handleFinalizationFailure(e, entry, repositoryData))
+                    }, e -> handleFinalizationFailure(e, snapshot, repositoryData))
                 );
-            }, e -> handleFinalizationFailure(e, entry, repositoryData));
+            }, e -> handleFinalizationFailure(e, snapshot, repositoryData));
         } catch (Exception e) {
             assert false : new AssertionError(e);
-            handleFinalizationFailure(e, entry, repositoryData);
+            handleFinalizationFailure(e, snapshot, repositoryData);
         }
     }
 
@@ -1574,11 +1577,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * operations if there are any.
      *
      * @param e              exception encountered
-     * @param entry          snapshot entry that failed to finalize
+     * @param snapshot       snapshot that failed to finalize
      * @param repositoryData current repository data for the snapshot's repository
      */
-    private void handleFinalizationFailure(Exception e, SnapshotsInProgress.Entry entry, RepositoryData repositoryData) {
-        Snapshot snapshot = entry.snapshot();
+    private void handleFinalizationFailure(Exception e, Snapshot snapshot, RepositoryData repositoryData) {
         if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
             // Failure due to not being master any more, don't try to remove snapshot from cluster state the next master
             // will try ending this snapshot again
@@ -1604,7 +1606,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      */
     private void runNextQueuedOperation(RepositoryData repositoryData, String repository, boolean attemptDelete) {
         assert currentlyFinalizing.contains(repository);
-        final Tuple<SnapshotsInProgress.Entry, Metadata> nextFinalization = repositoryOperations.pollFinalization(repository);
+        final Tuple<Snapshot, Metadata> nextFinalization = repositoryOperations.pollFinalization(repository);
         if (nextFinalization == null) {
             if (attemptDelete) {
                 runReadyDeletions(repositoryData, repository);
@@ -1711,13 +1713,180 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
     }
 
     /**
-     * Computes the cluster state resulting from removing a given snapshot create operation from the given state.
+     * Computes the cluster state resulting from removing a given snapshot create operation that was finalized in the repository from the
+     * given state. This method will update the shard generations of snapshots that the given snapshot depended on so that finalizing them
+     * will not cause rolling back to an outdated shard generation.
      *
      * @param state    current cluster state
      * @param snapshot snapshot for which to remove the snapshot operation
      * @return updated cluster state
      */
-    private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) {
+    private static ClusterState stateWithoutSuccessfulSnapshot(ClusterState state, Snapshot snapshot) {
+        // TODO: updating snapshots here leaks their outdated generation files, we should add logic to clean those up and enhance
+        // BlobStoreTestUtil to catch this leak
+        SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
+        ClusterState result = state;
+        int indexOfEntry = -1;
+        final List<SnapshotsInProgress.Entry> entryList = snapshots.entries();
+        for (int i = 0; i < entryList.size(); i++) {
+            SnapshotsInProgress.Entry entry = entryList.get(i);
+            if (entry.snapshot().equals(snapshot)) {
+                indexOfEntry = i;
+                break;
+            }
+        }
+        if (indexOfEntry >= 0) {
+            final List<SnapshotsInProgress.Entry> entries = new ArrayList<>(entryList.size() - 1);
+            final SnapshotsInProgress.Entry removedEntry = entryList.get(indexOfEntry);
+            for (int i = 0; i < indexOfEntry; i++) {
+                final SnapshotsInProgress.Entry previousEntry = entryList.get(i);
+                if (previousEntry.repository().equals(removedEntry.repository())) {
+                    if (removedEntry.isClone()) {
+                        if (previousEntry.isClone()) {
+                            ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> updatedShardAssignments = null;
+                            for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> finishedShardEntry : removedEntry.clones()) {
+                                final ShardSnapshotStatus shardState = finishedShardEntry.value;
+                                if (shardState.state() == ShardState.SUCCESS) {
+                                    updatedShardAssignments = maybeAddUpdatedAssignment(
+                                        updatedShardAssignments,
+                                        shardState,
+                                        finishedShardEntry.key,
+                                        previousEntry.clones()
+                                    );
+                                }
+                            }
+                            addCloneEntry(entries, previousEntry, updatedShardAssignments);
+                        } else {
+                            ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> updatedShardAssignments = null;
+                            for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> finishedShardEntry : removedEntry.clones()) {
+                                final ShardSnapshotStatus shardState = finishedShardEntry.value;
+                                if (shardState.state() != ShardState.SUCCESS) {
+                                    continue;
+                                }
+                                final RepositoryShardId repoShardId = finishedShardEntry.key;
+                                final IndexMetadata indexMeta = state.metadata().index(repoShardId.indexName());
+                                if (indexMeta == null) {
+                                    // The index name that finished cloning does not exist in the cluster state so it isn't relevant
+                                    // to the running snapshot
+                                    continue;
+                                }
+                                updatedShardAssignments = maybeAddUpdatedAssignment(
+                                    updatedShardAssignments,
+                                    shardState,
+                                    new ShardId(indexMeta.getIndex(), repoShardId.shardId()),
+                                    previousEntry.shards()
+                                );
+                            }
+                            addSnapshotEntry(entries, previousEntry, updatedShardAssignments);
+                        }
+                    } else {
+                        if (previousEntry.isClone()) {
+                            ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> updatedShardAssignments = null;
+                            for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> finishedShardEntry : removedEntry.shards()) {
+                                final ShardSnapshotStatus shardState = finishedShardEntry.value;
+                                if (shardState.state() != ShardState.SUCCESS) {
+                                    continue;
+                                }
+                                final ShardId shardId = finishedShardEntry.key;
+                                final IndexId indexId = removedEntry.indices().get(shardId.getIndexName());
+                                if (indexId == null) {
+                                    continue;
+                                }
+                                updatedShardAssignments = maybeAddUpdatedAssignment(
+                                    updatedShardAssignments,
+                                    shardState,
+                                    new RepositoryShardId(indexId, shardId.getId()),
+                                    previousEntry.clones()
+                                );
+                            }
+                            addCloneEntry(entries, previousEntry, updatedShardAssignments);
+                        } else {
+                            ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> updatedShardAssignments = null;
+                            for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> finishedShardEntry : removedEntry.shards()) {
+                                final ShardSnapshotStatus shardState = finishedShardEntry.value;
+                                if (shardState.state() == ShardState.SUCCESS) {
+                                    updatedShardAssignments = maybeAddUpdatedAssignment(
+                                        updatedShardAssignments,
+                                        shardState,
+                                        finishedShardEntry.key,
+                                        previousEntry.shards()
+                                    );
+                                }
+                            }
+                            addSnapshotEntry(entries, previousEntry, updatedShardAssignments);
+                        }
+                    }
+                } else {
+                    entries.add(previousEntry);
+                }
+            }
+            for (int i = indexOfEntry + 1; i < entryList.size(); i++) {
+                entries.add(entryList.get(i));
+            }
+            result = ClusterState.builder(state).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build();
+        }
+        return readyDeletions(result).v1();
+    }
+
+    private static void addSnapshotEntry(
+        List<SnapshotsInProgress.Entry> entries,
+        SnapshotsInProgress.Entry entryToUpdate,
+        @Nullable ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> updatedShardAssignments
+    ) {
+        if (updatedShardAssignments == null) {
+            entries.add(entryToUpdate);
+        } else {
+            final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> updatedStatus = ImmutableOpenMap.builder(entryToUpdate.shards());
+            updatedStatus.putAll(updatedShardAssignments.build());
+            entries.add(entryToUpdate.withShardStates(updatedStatus.build()));
+        }
+    }
+
+    private static void addCloneEntry(
+        List<SnapshotsInProgress.Entry> entries,
+        SnapshotsInProgress.Entry entryToUpdate,
+        @Nullable ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> updatedShardAssignments
+    ) {
+        if (updatedShardAssignments == null) {
+            entries.add(entryToUpdate);
+        } else {
+            final ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> updatedStatus = ImmutableOpenMap.builder(
+                entryToUpdate.clones()
+            );
+            updatedStatus.putAll(updatedShardAssignments.build());
+            entries.add(entryToUpdate.withClones(updatedStatus.build()));
+        }
+    }
+
+    @Nullable
+    private static <T> ImmutableOpenMap.Builder<T, ShardSnapshotStatus> maybeAddUpdatedAssignment(
+        @Nullable ImmutableOpenMap.Builder<T, ShardSnapshotStatus> updatedShardAssignments,
+        ShardSnapshotStatus finishedShardState,
+        T shardId,
+        ImmutableOpenMap<T, ShardSnapshotStatus> statesToUpdate
+    ) {
+        final String newGeneration = finishedShardState.generation();
+        final ShardSnapshotStatus stateToUpdate = statesToUpdate.get(shardId);
+        if (stateToUpdate != null
+            && stateToUpdate.state() == ShardState.SUCCESS
+            && Objects.equals(newGeneration, stateToUpdate.generation()) == false) {
+            if (updatedShardAssignments == null) {
+                updatedShardAssignments = ImmutableOpenMap.builder();
+            }
+            updatedShardAssignments.put(shardId, stateToUpdate.withUpdatedGeneration(newGeneration));
+        }
+        return updatedShardAssignments;
+    }
+
+    /**
+     * Computes the cluster state resulting from removing a given snapshot create operation from the given state after it has failed at
+     * any point before being finalized in the repository.
+     *
+     * @param state    current cluster state
+     * @param snapshot snapshot for which to remove the snapshot operation
+     * @return updated cluster state
+     */
+    private static ClusterState stateWithoutFailedSnapshot(ClusterState state, Snapshot snapshot) {
         SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
         ClusterState result = state;
         boolean changed = false;
@@ -1751,7 +1920,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 
             @Override
             public ClusterState execute(ClusterState currentState) {
-                final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot);
+                final ClusterState updatedState = stateWithoutFailedSnapshot(currentState, snapshot);
                 // now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them
                 return updateWithSnapshots(
                     updatedState,
@@ -3324,9 +3493,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 failure
             );
             synchronized (currentlyFinalizing) {
-                Tuple<SnapshotsInProgress.Entry, Metadata> finalization;
+                Tuple<Snapshot, Metadata> finalization;
                 while ((finalization = repositoryOperations.pollFinalization(repository)) != null) {
-                    assert snapshotsToFail.contains(finalization.v1().snapshot())
+                    assert snapshotsToFail.contains(finalization.v1())
                         : "[" + finalization.v1() + "] not found in snapshots to fail " + snapshotsToFail;
                 }
                 leaveRepoLoop(repository);
@@ -3344,10 +3513,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
     private static final class OngoingRepositoryOperations {
 
         /**
-         * Map of repository name to a deque of {@link SnapshotsInProgress.Entry} that need to be finalized for the repository and the
+         * Map of repository name to a deque of {@link Snapshot} that need to be finalized for the repository and the
          * {@link Metadata to use when finalizing}.
          */
-        private final Map<String, Deque<SnapshotsInProgress.Entry>> snapshotsToFinalize = new HashMap<>();
+        private final Map<String, Deque<Snapshot>> snapshotsToFinalize = new HashMap<>();
 
         /**
          * Set of delete operations currently being executed against the repository. The values in this set are the delete UUIDs returned
@@ -3359,16 +3528,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         private Metadata latestKnownMetaData;
 
         @Nullable
-        synchronized Tuple<SnapshotsInProgress.Entry, Metadata> pollFinalization(String repository) {
+        synchronized Tuple<Snapshot, Metadata> pollFinalization(String repository) {
             assertConsistent();
-            final SnapshotsInProgress.Entry nextEntry;
-            final Deque<SnapshotsInProgress.Entry> queued = snapshotsToFinalize.get(repository);
+            final Snapshot nextEntry;
+            final Deque<Snapshot> queued = snapshotsToFinalize.get(repository);
             if (queued == null) {
                 return null;
             }
             nextEntry = queued.pollFirst();
             assert nextEntry != null;
-            final Tuple<SnapshotsInProgress.Entry, Metadata> res = Tuple.tuple(nextEntry, latestKnownMetaData);
+            final Tuple<Snapshot, Metadata> res = Tuple.tuple(nextEntry, latestKnownMetaData);
             if (queued.isEmpty()) {
                 snapshotsToFinalize.remove(repository);
             }
@@ -3387,8 +3556,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             runningDeletions.remove(deleteUUID);
         }
 
-        synchronized void addFinalization(SnapshotsInProgress.Entry entry, Metadata metadata) {
-            snapshotsToFinalize.computeIfAbsent(entry.repository(), k -> new LinkedList<>()).add(entry);
+        synchronized void addFinalization(Snapshot snapshot, Metadata metadata) {
+            snapshotsToFinalize.computeIfAbsent(snapshot.getRepository(), k -> new LinkedList<>()).add(snapshot);
             this.latestKnownMetaData = metadata;
             assertConsistent();
         }
@@ -3410,7 +3579,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         synchronized boolean assertNotQueued(Snapshot snapshot) {
             assert snapshotsToFinalize.getOrDefault(snapshot.getRepository(), new LinkedList<>())
                 .stream()
-                .noneMatch(entry -> entry.snapshot().equals(snapshot)) : "Snapshot [" + snapshot + "] is still in finalization queue";
+                .noneMatch(entry -> entry.equals(snapshot)) : "Snapshot [" + snapshot + "] is still in finalization queue";
             return true;
         }
 

+ 4 - 0
test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -224,6 +224,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
         AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repositoryName).setBlockAndFailOnWriteSnapFiles();
     }
 
+    public static void blockMasterOnShardLevelSnapshotFile(final String repositoryName, String indexId) {
+        AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repositoryName).setBlockAndOnWriteShardLevelSnapFiles(indexId);
+    }
+
     @SuppressWarnings("unchecked")
     public static <T extends Repository> T getRepositoryOnMaster(String repositoryName) {
         return ((T) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repositoryName));

+ 10 - 1
test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

@@ -129,6 +129,7 @@ public class MockRepository extends FsRepository {
 
     /** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */
     private volatile boolean blockAndFailOnWriteSnapFile;
+    private volatile String blockedIndexId;
 
     private volatile boolean blockOnWriteShardLevelMeta;
 
@@ -213,6 +214,7 @@ public class MockRepository extends FsRepository {
         blockAndFailOnWriteIndexFile = false;
         blockOnWriteIndexFile = false;
         blockAndFailOnWriteSnapFile = false;
+        blockedIndexId = null;
         blockOnDeleteIndexN = false;
         blockOnWriteShardLevelMeta = false;
         blockOnReadIndexMeta = false;
@@ -232,6 +234,10 @@ public class MockRepository extends FsRepository {
         blockAndFailOnWriteSnapFile = true;
     }
 
+    public void setBlockAndOnWriteShardLevelSnapFiles(String indexId) {
+        blockedIndexId = indexId;
+    }
+
     public void setBlockAndFailOnWriteIndexFile() {
         assert blockOnWriteIndexFile == false : "Either fail or wait after blocking on index-N not both";
         blockAndFailOnWriteIndexFile = true;
@@ -281,7 +287,8 @@ public class MockRepository extends FsRepository {
         boolean wasBlocked = false;
         try {
             while (blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile ||
-                blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta) {
+                blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta ||
+                blockedIndexId != null) {
                 blocked = true;
                 this.wait();
                 wasBlocked = true;
@@ -370,6 +377,8 @@ public class MockRepository extends FsRepository {
                         blockExecutionAndMaybeWait(blobName);
                     } else if (blobName.startsWith("snap-") && blockAndFailOnWriteSnapFile) {
                         blockExecutionAndFail(blobName);
+                    } else if (blockedIndexId != null && path().parts().contains(blockedIndexId) && blobName.startsWith("snap-")) {
+                        blockExecutionAndMaybeWait(blobName);
                     }
                 }
             }

+ 4 - 0
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -18,6 +18,7 @@ import org.apache.lucene.search.TotalHits;
 import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
@@ -1224,6 +1225,9 @@ public abstract class ESIntegTestCase extends ESTestCase {
         return client().prepareIndex(index).setId(id).setSource(source).execute().actionGet();
     }
 
+    protected final ActionFuture<IndexResponse> startIndex(String index, String id, BytesReference source, XContentType type) {
+        return client().prepareIndex(index).setId(id).setSource(source, type).execute();
+    }
     /**
      * Syntactic sugar for:
      * <pre>