1
0
Эх сурвалжийг харах

Fix Potential Repository Corruption during Master Failover (#82912)

Solves the problem described in #82911. 
What corrupts the repository is the sequence of:

1. Start delete that removes all snapshots for an index "A"
2. Queue snapshot for index "A" after that delete. This snapshot will have an `IndexId` for "A" that is the same as used by the snapshot(s) in the above delete. Using this is broken because we assume we never re-use the uuid for the index like that and a stale master may run listing and deleting on a uuid if it ever goes out of scope.
3. Deleting master goes stale and later does stuff to the folder after the snapshot from 2. is done with that shard.

Ideally, we'd never bind the snapshot in the second step to that `IndexId` in the first place since we already technically know that the delete will bring us into a situation where we need a "fresh" `IndexId`. Unfortunately, that delete can fail on IO issues with the repo, in which case we need to use the existing `IndexId` for the snapshot to not corrupt the repo by having two `IndexId` for the same index name. So what I did here is to check the `IndexId`s against the current `RepositoryData` again when starting snapshots because deletes were removed from the cluster state and re-initialize to fresh ones those that are not in the repository data to make sure we never conflict with a stale master doing deletes.
We can technically make this nicer by using some placeholder for index uuids in queued up snapshots or so, but that will require a change to the state machine and BwC around that and it doesn't really buy us much in terms of computation since it's such a rare thing to run into IMO. We need this change to make sure the next rolling upgrade from an older master to a newer master with this fix is safe regardless => I figured this is good enough for now and fixes the test that reliably reproduces this just fine.

closes #82911
Armin Braun 3 жил өмнө
parent
commit
4ad9070b98

+ 61 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java

@@ -1062,6 +1062,57 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
         awaitNoMoreRunningOperations();
     }
 
+    public void testMasterFailoverDuringStaleIndicesCleanup() throws Exception {
+        internalCluster().startMasterOnlyNodes(3);
+        final String dataNode = internalCluster().startDataOnlyNode();
+        final String repoName = "test-repo";
+        createRepository(repoName, "mock");
+        createFullSnapshot(repoName, "empty-snapshot");
+        // use a few more shards to make master take a little longer to clean up the stale index and simulate more concurrency between
+        // snapshot create and delete below
+        createIndexWithContent("index-test", indexSettingsNoReplicas(randomIntBetween(6, 10)).build());
+        final NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
+        internalCluster().setDisruptionScheme(networkDisruption);
+
+        final List<String> fullSnapshotsToDelete = createNSnapshots(repoName, randomIntBetween(1, 5));
+        final String masterName = internalCluster().getMasterName();
+        blockMasterOnAnyDataFile(repoName);
+        final ActionFuture<AcknowledgedResponse> deleteAllSnapshotsWithIndex = startDeleteSnapshots(
+            repoName,
+            fullSnapshotsToDelete,
+            masterName
+        );
+
+        final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshotFromDataNode(repoName, "new-full-snapshot");
+        waitForBlock(masterName, repoName);
+        awaitNDeletionsInProgress(1);
+        awaitNumberOfSnapshotsInProgress(1);
+        networkDisruption.startDisrupting();
+        ensureStableCluster(3, dataNode);
+        // wait for the snapshot to finish while the isolated master is stuck on deleting a data blob
+        try {
+            snapshotFuture.get();
+        } catch (Exception e) {
+            // ignore exceptions here, the snapshot will work out fine in all cases but the API might throw because of the master
+            // fail-over during the snapshot
+            // TODO: remove this leniency once we fix the API to handle master failover cleaner
+        }
+        awaitNoMoreRunningOperations(dataNode);
+
+        // now unblock the stale master and have it continue deleting blobs from the repository
+        unblockNode(repoName, masterName);
+
+        networkDisruption.stopDisrupting();
+        ensureStableCluster(4);
+        try {
+            deleteAllSnapshotsWithIndex.get();
+        } catch (Exception ignored) {
+            // ignored as we had a failover in here and will get all kinds of errors as a result, just making sure the future completes in
+            // all cases for now
+            // TODO: remove this leniency once we fix the API to handle master failover cleaner
+        }
+    }
+
     public void testStatusMultipleSnapshotsMultipleRepos() throws Exception {
         internalCluster().startMasterOnlyNode();
         // We're blocking a some of the snapshot threads when we block the first repo below so we have to make sure we have enough threads
@@ -1975,6 +2026,16 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
             .execute();
     }
 
+    private ActionFuture<CreateSnapshotResponse> startFullSnapshotFromDataNode(String repoName, String snapshotName) {
+        logger.info("--> creating full snapshot [{}] to repo [{}] from data node client", snapshotName, repoName);
+        return internalCluster().dataNodeClient()
+            .admin()
+            .cluster()
+            .prepareCreateSnapshot(repoName, snapshotName)
+            .setWaitForCompletion(true)
+            .execute();
+    }
+
     private ActionFuture<CreateSnapshotResponse> startFullSnapshotFromMasterClient(String repoName, String snapshotName) {
         logger.info("--> creating full snapshot [{}] to repo [{}] from master client", snapshotName, repoName);
         return internalCluster().masterClient()

+ 46 - 0
server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

@@ -889,6 +889,52 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             );
         }
 
+        /**
+         * Reassigns all {@link IndexId} in a snapshot that can be found as keys in the given {@code updates} to the {@link IndexId} value
+         * that they map to.
+         * This method is used in an edge case of removing a {@link SnapshotDeletionsInProgress.Entry} from the cluster state at the
+         * end of a delete. If the delete removed the last use of a certain {@link IndexId} from the repository then we do not want to
+         * reuse that {@link IndexId} because the implementation of {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository}
+         * assumes that a given {@link IndexId} will never be reused if it went from referenced to unreferenced in the
+         * {@link org.elasticsearch.repositories.RepositoryData} in a delete.
+         *
+         * @param updates map of existing {@link IndexId} to updated {@link IndexId}
+         * @return a new instance with updated index ids or this instance if unchanged
+         */
+        public Entry withUpdatedIndexIds(Map<IndexId, IndexId> updates) {
+            assert isClone() == false : "only snapshots can be reassigned to updated IndexId values";
+            Map<String, IndexId> updatedIndices = null;
+            for (IndexId existingIndexId : indices.values()) {
+                final IndexId updatedIndexId = updates.get(existingIndexId);
+                if (updatedIndexId != null) {
+                    if (updatedIndices == null) {
+                        updatedIndices = new HashMap<>(indices);
+                    }
+                    updatedIndices.put(updatedIndexId.getName(), updatedIndexId);
+                }
+            }
+            if (updatedIndices != null) {
+                return new Entry(
+                    snapshot,
+                    includeGlobalState,
+                    partial,
+                    state,
+                    updatedIndices,
+                    dataStreams,
+                    featureStates,
+                    startTime,
+                    repositoryStateId,
+                    shards,
+                    failure,
+                    userMetadata,
+                    version,
+                    source,
+                    ImmutableOpenMap.of()
+                );
+            }
+            return this;
+        }
+
         public Entry withClones(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> updatedClones) {
             if (updatedClones.equals(shardStatusByRepoShardId)) {
                 return this;

+ 18 - 2
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -2628,6 +2628,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             final String localNodeId = currentState.nodes().getLocalNodeId();
             final String repoName = deleteEntry.repository();
             InFlightShardSnapshotStates inFlightShardStates = null;
+            // Keep track of IndexId values that may have gone unreferenced due to the delete entry just executed.
+            // See org.elasticsearch.cluster.SnapshotsInProgress.Entry#withUpdatedIndexIds for details.
+            final Set<IndexId> newIndexIdsToRefresh = new HashSet<>();
             for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repoName)) {
                 if (entry.state().completed() == false) {
                     // TODO: dry up redundant computation and code between clone and non-clone case, in particular reuse
@@ -2675,9 +2678,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                         // Collect waiting shards that in entry that we can assign now that we are done with the deletion
                         final List<RepositoryShardId> canBeUpdated = new ArrayList<>();
                         for (Map.Entry<RepositoryShardId, ShardSnapshotStatus> value : entry.shardsByRepoShardId().entrySet()) {
+                            final RepositoryShardId repositoryShardId = value.getKey();
                             if (value.getValue().equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)
-                                && reassignedShardIds.contains(value.getKey()) == false) {
-                                canBeUpdated.add(value.getKey());
+                                && reassignedShardIds.contains(repositoryShardId) == false) {
+                                canBeUpdated.add(repositoryShardId);
+                                if (repositoryData.hasIndex(repositoryShardId.indexName()) == false) {
+                                    newIndexIdsToRefresh.add(repositoryShardId.index());
+                                }
                             }
                         }
                         if (canBeUpdated.isEmpty()) {
@@ -2725,6 +2732,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     snapshotEntries.add(entry);
                 }
             }
+            if (changed && newIndexIdsToRefresh.isEmpty() == false) {
+                final Map<IndexId, IndexId> updatedIndexIds = Maps.newMapWithExpectedSize(newIndexIdsToRefresh.size());
+                for (IndexId indexIdToRefresh : newIndexIdsToRefresh) {
+                    updatedIndexIds.put(indexIdToRefresh, new IndexId(indexIdToRefresh.getName(), UUIDs.randomBase64UUID()));
+                }
+                for (int i = 0; i < snapshotEntries.size(); i++) {
+                    snapshotEntries.set(i, snapshotEntries.get(i).withUpdatedIndexIds(updatedIndexIds));
+                }
+            }
             return changed ? snapshotsInProgress.withUpdatedEntriesForRepo(repoName, snapshotEntries) : null;
         }
 

+ 9 - 0
test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -228,6 +228,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
         AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repositoryName).setBlockAndFailOnWriteSnapFiles();
     }
 
+    public static void blockMasterOnAnyDataFile(final String repositoryName) {
+        AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repositoryName).blockOnDataFiles();
+    }
+
     public static void blockMasterOnShardLevelSnapshotFile(final String repositoryName, String indexId) {
         AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repositoryName).setBlockOnShardLevelSnapFiles(indexId);
     }
@@ -636,6 +640,11 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
         return clusterAdmin().prepareDeleteSnapshot(repoName, snapshotName).execute();
     }
 
+    protected ActionFuture<AcknowledgedResponse> startDeleteSnapshots(String repoName, List<String> snapshotNames, String viaNode) {
+        logger.info("--> deleting snapshots {} from repo [{}]", snapshotNames, repoName);
+        return client(viaNode).admin().cluster().prepareDeleteSnapshot(repoName, snapshotNames.toArray(Strings.EMPTY_ARRAY)).execute();
+    }
+
     protected static void updateClusterState(final Function<ClusterState, ClusterState> updater) throws Exception {
         final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
         final ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);