Browse Source

Fix Clone Snapshot State Machine Bug (#67874)

This fixes a left over from #65042 where if a clone
was initially put in the cluster state before another clone
but then initialized after the other clone already finished its shard clones,
a broken snapshot state would be reached where the first clone
would be queued before the almost finished clone.

closes #67871
Armin Braun 4 years ago
parent
commit
43095a310d

+ 33 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java

@@ -613,6 +613,39 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
         assertAcked(cloneFuture.get());
     }
 
+    public void testManyConcurrentClonesStartOutOfOrder() throws Exception {
+        // large snapshot pool to allow for concurrently finishing clone while another clone is blocked on trying to load SnapshotInfo
+        final String masterName = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
+        internalCluster().startDataOnlyNode();
+        final String repoName = "test-repo";
+        createRepository(repoName, "mock");
+        final String testIndex = "test-idx";
+        createIndexWithContent(testIndex);
+
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repoName, sourceSnapshot);
+        assertAcked(admin().indices().prepareDelete(testIndex).get());
+
+        final MockRepository repo = getRepositoryOnMaster(repoName);
+        repo.setBlockOnceOnReadSnapshotInfoIfAlreadyBlocked();
+        repo.setBlockOnWriteIndexFile();
+
+        final ActionFuture<AcknowledgedResponse> clone1 = startClone(repoName, sourceSnapshot, "target-snapshot-1", testIndex);
+        // wait for this snapshot to show up in the cluster state
+        awaitNumberOfSnapshotsInProgress(1);
+        waitForBlock(masterName, repoName);
+
+        final ActionFuture<AcknowledgedResponse> clone2 = startClone(repoName, sourceSnapshot, "target-snapshot-2", testIndex);
+
+        awaitNumberOfSnapshotsInProgress(2);
+        awaitClusterState(state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
+                .entries().stream().anyMatch(entry -> entry.state().completed()));
+        repo.unblock();
+
+        assertAcked(clone1.get());
+        assertAcked(clone2.get());
+    }
+
     private ActionFuture<AcknowledgedResponse> startCloneFromDataNode(String repoName, String sourceSnapshot, String targetSnapshot,
                                                                       String... indices) {
         return startClone(dataNodeClient(), repoName, sourceSnapshot, targetSnapshot, indices);

+ 5 - 13
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -493,14 +493,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                             // no need to compute these, we'll mark all shards as queued anyway because we wait for the delete
                             inFlightShardStates = null;
                         }
-                        boolean queuedShards = false;
                         for (Tuple<IndexId, Integer> count : counts) {
                             for (int shardId = 0; shardId < count.v2(); shardId++) {
                                 final RepositoryShardId repoShardId = new RepositoryShardId(count.v1(), shardId);
                                 final String indexName = repoShardId.indexName();
                                 if (readyToExecute == false || inFlightShardStates.isActive(indexName, shardId)) {
                                     clonesBuilder.put(repoShardId, ShardSnapshotStatus.UNASSIGNED_QUEUED);
-                                    queuedShards = true;
                                 } else {
                                     clonesBuilder.put(repoShardId, new ShardSnapshotStatus(localNodeId,
                                         inFlightShardStates.generationForShard(repoShardId.index(), shardId, shardGenerations)));
@@ -508,17 +506,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                             }
                         }
                         updatedEntry = cloneEntry.withClones(clonesBuilder.build());
-                        if (queuedShards) {
-                            // We queued up some shards based on the in-flight operations found in all snapshots for the current
-                            // repository, so in order to make sure we don't set a shard to QUEUED before (as in before it in the
-                            // `updatedEntries` list) one that is actively executing we just put it to the back of the list as if we had
-                            // just created the entry
-                            // TODO: If we could eventually drop the snapshot clone init phase we don't need this any longer
-                            updatedEntries.remove(i);
-                            updatedEntries.add(updatedEntry);
-                        } else {
-                            updatedEntries.set(i, updatedEntry);
-                        }
+                        // Move the now ready to execute clone operation to the back of the snapshot operations order because its
+                        // shard snapshot state was based on all previous existing operations in progress
+                        // TODO: If we could eventually drop the snapshot clone init phase we don't need this any longer
+                        updatedEntries.remove(i);
+                        updatedEntries.add(updatedEntry);
                         changed = true;
                         break;
                     }

+ 18 - 1
test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

@@ -61,6 +61,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -132,6 +133,8 @@ public class MockRepository extends FsRepository {
 
     private volatile boolean blockOnReadIndexMeta;
 
+    private final AtomicBoolean blockOnceOnReadSnapshotInfo = new AtomicBoolean(false);
+
     /**
      * Writes to the blob {@code index.latest} at the repository root will fail with an {@link IOException} if {@code true}.
      */
@@ -206,6 +209,7 @@ public class MockRepository extends FsRepository {
         blockOnDeleteIndexN = false;
         blockOnWriteShardLevelMeta = false;
         blockOnReadIndexMeta = false;
+        blockOnceOnReadSnapshotInfo.set(false);
         this.notifyAll();
     }
 
@@ -247,6 +251,16 @@ public class MockRepository extends FsRepository {
         this.failReadsAfterUnblock = failReadsAfterUnblock;
     }
 
+    /**
+     * Enable blocking a single read of {@link org.elasticsearch.snapshots.SnapshotInfo} in case the repo is already blocked on another
+     * file. This allows testing very specific timing issues where a read of {@code SnapshotInfo} is much slower than another concurrent
+     * repository operation. See {@link #blockExecution()} for the exact mechanics of why we need a secondary block defined here.
+     * TODO: clean this up to not require a second block set
+     */
+    public void setBlockOnceOnReadSnapshotInfoIfAlreadyBlocked() {
+        blockOnceOnReadSnapshotInfo.set(true);
+    }
+
     public boolean blocked() {
         return blocked;
     }
@@ -396,7 +410,10 @@ public class MockRepository extends FsRepository {
 
             @Override
             public InputStream readBlob(String name) throws IOException {
-                if (blockOnReadIndexMeta && name.startsWith(BlobStoreRepository.METADATA_PREFIX) &&  path().equals(basePath()) == false) {
+                if (blockOnReadIndexMeta && name.startsWith(BlobStoreRepository.METADATA_PREFIX) && path().equals(basePath()) == false) {
+                    blockExecutionAndMaybeWait(name);
+                } else if (path().equals(basePath()) && name.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)
+                        && blockOnceOnReadSnapshotInfo.compareAndSet(true, false)) {
                     blockExecutionAndMaybeWait(name);
                 } else {
                     maybeReadErrorAfterBlock(name);