浏览代码

Clone Snapshot API (#61839)

Adds clone snapshot API to clone part of a snapshot into a new snapshot.
Armin Braun 5 年之前
父节点
当前提交
f7f239d39a

+ 52 - 0
docs/reference/snapshot-restore/apis/clone-snapshot-api.asciidoc

@@ -0,0 +1,52 @@
+[[clone-snapshot-api]]
+=== Clone snapshot API
+++++
+<titleabbrev>Clone snapshot</titleabbrev>
+++++
+
+Clones part or all of a snapshot into a new snapshot.
+
+[source,console]
+----
+PUT /_snapshot/my_repository/source_snapshot/_clone/target_snapshot
+{
+  "indices": "index_a,index_b"
+}
+----
+// TEST[skip:TODO]
+
+[[clone-snapshot-api-request]]
+==== {api-request-title}
+
+`PUT /_snapshot/<repository>/<source_snapshot>/_clone/<target_snapshot>`
+
+[[clone-snapshot-api-desc]]
+==== {api-description-title}
+
+The clone snapshot API allows creating a copy of all or part of an existing snapshot
+within the same repository.
+
+[[clone-snapshot-api-params]]
+==== {api-path-parms-title}
+
+`<repository>`::
+(Required, string)
+Name of the snapshot repository that both source and target snapshot belong to.
+
+[[clone-snapshot-api-query-params]]
+==== {api-query-parms-title}
+
+`master_timeout`::
+(Optional, <<time-units, time units>>) Specifies the period of time to wait for
+a connection to the master node. If no response is received before the timeout
+expires, the request fails and returns an error. Defaults to `30s`.
+
+`timeout`::
+(Optional, <<time-units, time units>>) Specifies the period of time to wait for
+a response. If no response is received before the timeout expires, the request
+fails and returns an error. Defaults to `30s`.
+
+`indices`::
+(Required, string)
+A comma-separated list of indices to include in the snapshot.
+<<multi-index,Multi-index syntax>> is supported.

+ 1 - 0
docs/reference/snapshot-restore/index.asciidoc

@@ -107,6 +107,7 @@ understand the time requirements before proceeding.
 --
 
 include::register-repository.asciidoc[]
+include::apis/clone-snapshot-api.asciidoc[]
 include::take-snapshot.asciidoc[]
 include::restore-snapshot.asciidoc[]
 include::monitor-snapshot-restore.asciidoc[]

+ 2 - 0
docs/reference/snapshot-restore/take-snapshot.asciidoc

@@ -124,3 +124,5 @@ PUT /_snapshot/my_backup/<snapshot-{now/d}>
 PUT /_snapshot/my_backup/%3Csnapshot-%7Bnow%2Fd%7D%3E
 -----------------------------------
 // TEST[continued]
+
+NOTE: You can also create snapshots that are copies of part of an existing snapshot using the <<clone-snapshot-api,clone snapshot API>>.

+ 404 - 6
server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java

@@ -18,6 +18,26 @@
  */
 package org.elasticsearch.snapshots;
 
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.repositories.RepositoryData;
+import org.elasticsearch.snapshots.mockstore.MockRepository;
+import org.elasticsearch.test.ESIntegTestCase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.containsString;
+
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.common.UUIDs;
@@ -26,16 +46,11 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
 import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots;
 import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
 import org.elasticsearch.repositories.IndexId;
-import org.elasticsearch.repositories.RepositoriesService;
-import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryShardId;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
-import org.elasticsearch.test.ESIntegTestCase;
 
 import java.nio.file.Path;
-import java.util.List;
 
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 
@@ -53,7 +68,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
         if (useBwCFormat) {
             initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT);
             // Re-create repo to clear repository data cache
-            assertAcked(client().admin().cluster().prepareDeleteRepository(repoName).get());
+            assertAcked(clusterAdmin().prepareDeleteRepository(repoName).get());
             createRepository(repoName, "fs", repoPath);
         }
 
@@ -107,6 +122,389 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
         assertEquals(newShardGeneration, newShardGeneration2);
     }
 
+    public void testCloneSnapshotIndex() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        internalCluster().startDataOnlyNode();
+        final String repoName = "repo-name";
+        createRepository(repoName, "fs");
+
+        final String indexName = "index-1";
+        createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repoName, sourceSnapshot);
+
+        indexRandomDocs(indexName, randomIntBetween(20, 100));
+        if (randomBoolean()) {
+            assertAcked(admin().indices().prepareDelete(indexName));
+        }
+        final String targetSnapshot = "target-snapshot";
+        assertAcked(startClone(repoName, sourceSnapshot, targetSnapshot, indexName).get());
+
+        final List<SnapshotStatus> status = clusterAdmin().prepareSnapshotStatus(repoName)
+                .setSnapshots(sourceSnapshot, targetSnapshot).get().getSnapshots();
+        assertThat(status, hasSize(2));
+        final SnapshotIndexStatus status1 = status.get(0).getIndices().get(indexName);
+        final SnapshotIndexStatus status2 = status.get(1).getIndices().get(indexName);
+        assertEquals(status1.getStats().getTotalFileCount(), status2.getStats().getTotalFileCount());
+        assertEquals(status1.getStats().getTotalSize(), status2.getStats().getTotalSize());
+    }
+
+    public void testClonePreventsSnapshotDelete() throws Exception {
+        final String masterName = internalCluster().startMasterOnlyNode();
+        internalCluster().startDataOnlyNode();
+        final String repoName = "repo-name";
+        createRepository(repoName, "mock");
+
+        final String indexName = "index-1";
+        createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repoName, sourceSnapshot);
+
+        indexRandomDocs(indexName, randomIntBetween(20, 100));
+
+        final String targetSnapshot = "target-snapshot";
+        blockNodeOnAnyFiles(repoName, masterName);
+        final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName);
+        waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
+        assertFalse(cloneFuture.isDone());
+
+        ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class,
+                () -> startDeleteSnapshot(repoName, sourceSnapshot).actionGet());
+        assertThat(ex.getMessage(), containsString("cannot delete snapshot while it is being cloned"));
+
+        unblockNode(repoName, masterName);
+        assertAcked(cloneFuture.get());
+        final List<SnapshotStatus> status = clusterAdmin().prepareSnapshotStatus(repoName)
+                .setSnapshots(sourceSnapshot, targetSnapshot).get().getSnapshots();
+        assertThat(status, hasSize(2));
+        final SnapshotIndexStatus status1 = status.get(0).getIndices().get(indexName);
+        final SnapshotIndexStatus status2 = status.get(1).getIndices().get(indexName);
+        assertEquals(status1.getStats().getTotalFileCount(), status2.getStats().getTotalFileCount());
+        assertEquals(status1.getStats().getTotalSize(), status2.getStats().getTotalSize());
+    }
+
+    public void testConcurrentCloneAndSnapshot() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        final String dataNode = internalCluster().startDataOnlyNode();
+        final String repoName = "repo-name";
+        createRepository(repoName, "mock");
+
+        final String indexName = "index-1";
+        createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repoName, sourceSnapshot);
+
+        indexRandomDocs(indexName, randomIntBetween(20, 100));
+
+        final String targetSnapshot = "target-snapshot";
+        final ActionFuture<CreateSnapshotResponse> snapshot2Future =
+                startFullSnapshotBlockedOnDataNode("snapshot-2", repoName, dataNode);
+        waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
+        final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName);
+        awaitNumberOfSnapshotsInProgress(2);
+        unblockNode(repoName, dataNode);
+        assertAcked(cloneFuture.get());
+        assertSuccessful(snapshot2Future);
+    }
+
+    public void testLongRunningCloneAllowsConcurrentSnapshot() throws Exception {
+        // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations
+        final String masterNode = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
+        internalCluster().startDataOnlyNode();
+        final String repoName = "test-repo";
+        createRepository(repoName, "mock");
+        final String indexSlow = "index-slow";
+        createIndexWithContent(indexSlow);
+
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repoName, sourceSnapshot);
+
+        final String targetSnapshot = "target-snapshot";
+        blockMasterOnShardClone(repoName);
+        final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow);
+        waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
+
+        final String indexFast = "index-fast";
+        createIndexWithRandomDocs(indexFast, randomIntBetween(20, 100));
+
+        assertSuccessful(clusterAdmin().prepareCreateSnapshot(repoName, "fast-snapshot")
+                .setIndices(indexFast).setWaitForCompletion(true).execute());
+
+        assertThat(cloneFuture.isDone(), is(false));
+        unblockNode(repoName, masterNode);
+
+        assertAcked(cloneFuture.get());
+    }
+
+    public void testLongRunningSnapshotAllowsConcurrentClone() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        final String dataNode = internalCluster().startDataOnlyNode();
+        final String repoName = "test-repo";
+        createRepository(repoName, "mock");
+        final String indexSlow = "index-slow";
+        createIndexWithContent(indexSlow);
+
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repoName, sourceSnapshot);
+
+        final String indexFast = "index-fast";
+        createIndexWithRandomDocs(indexFast, randomIntBetween(20, 100));
+
+        blockDataNode(repoName, dataNode);
+        final ActionFuture<CreateSnapshotResponse> snapshotFuture = clusterAdmin()
+                .prepareCreateSnapshot(repoName, "fast-snapshot").setIndices(indexFast).setWaitForCompletion(true).execute();
+        waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
+
+        final String targetSnapshot = "target-snapshot";
+        assertAcked(startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow).get());
+
+        assertThat(snapshotFuture.isDone(), is(false));
+        unblockNode(repoName, dataNode);
+
+        assertSuccessful(snapshotFuture);
+    }
+
+    public void testDeletePreventsClone() throws Exception {
+        final String masterName = internalCluster().startMasterOnlyNode();
+        internalCluster().startDataOnlyNode();
+        final String repoName = "repo-name";
+        createRepository(repoName, "mock");
+
+        final String indexName = "index-1";
+        createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repoName, sourceSnapshot);
+
+        indexRandomDocs(indexName, randomIntBetween(20, 100));
+
+        final String targetSnapshot = "target-snapshot";
+        blockNodeOnAnyFiles(repoName, masterName);
+        final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, sourceSnapshot);
+        waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
+        assertFalse(deleteFuture.isDone());
+
+        ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class, () ->
+                startClone(repoName, sourceSnapshot, targetSnapshot, indexName).actionGet());
+        assertThat(ex.getMessage(), containsString("cannot clone from snapshot that is being deleted"));
+
+        unblockNode(repoName, masterName);
+        assertAcked(deleteFuture.get());
+    }
+
+    public void testBackToBackClonesForIndexNotInCluster() throws Exception {
+        // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations
+        final String masterNode = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
+        internalCluster().startDataOnlyNode();
+        final String repoName = "test-repo";
+        createRepository(repoName, "mock");
+        final String indexBlocked = "index-blocked";
+        createIndexWithContent(indexBlocked);
+
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repoName, sourceSnapshot);
+
+        assertAcked(admin().indices().prepareDelete(indexBlocked).get());
+
+        final String targetSnapshot1 = "target-snapshot";
+        blockMasterOnShardClone(repoName);
+        final ActionFuture<AcknowledgedResponse> cloneFuture1 = startClone(repoName, sourceSnapshot, targetSnapshot1, indexBlocked);
+        waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
+        assertThat(cloneFuture1.isDone(), is(false));
+
+        final int extraClones = randomIntBetween(1, 5);
+        final List<ActionFuture<AcknowledgedResponse>> extraCloneFutures = new ArrayList<>(extraClones);
+        for (int i = 0; i < extraClones; i++) {
+            extraCloneFutures.add(startClone(repoName, sourceSnapshot, "target-snapshot-" + i, indexBlocked));
+        }
+        awaitNumberOfSnapshotsInProgress(1 + extraClones);
+        for (ActionFuture<AcknowledgedResponse> extraCloneFuture : extraCloneFutures) {
+            assertFalse(extraCloneFuture.isDone());
+        }
+
+        final int extraSnapshots = randomIntBetween(0, 5);
+        if (extraSnapshots > 0) {
+            createIndexWithContent(indexBlocked);
+        }
+
+        final List<ActionFuture<CreateSnapshotResponse>> extraSnapshotFutures = new ArrayList<>(extraSnapshots);
+        for (int i = 0; i < extraSnapshots; i++) {
+            extraSnapshotFutures.add(startFullSnapshot(repoName, "extra-snap-" + i));
+        }
+
+        awaitNumberOfSnapshotsInProgress(1 + extraClones + extraSnapshots);
+        for (ActionFuture<CreateSnapshotResponse> extraSnapshotFuture : extraSnapshotFutures) {
+            assertFalse(extraSnapshotFuture.isDone());
+        }
+
+        unblockNode(repoName, masterNode);
+        assertAcked(cloneFuture1.get());
+
+        for (ActionFuture<AcknowledgedResponse> extraCloneFuture : extraCloneFutures) {
+            assertAcked(extraCloneFuture.get());
+        }
+        for (ActionFuture<CreateSnapshotResponse> extraSnapshotFuture : extraSnapshotFutures) {
+            assertSuccessful(extraSnapshotFuture);
+        }
+    }
+
+    public void testMasterFailoverDuringCloneStep1() throws Exception {
+        internalCluster().startMasterOnlyNodes(3);
+        internalCluster().startDataOnlyNode();
+        final String repoName = "test-repo";
+        createRepository(repoName, "mock");
+        final String testIndex = "index-test";
+        createIndexWithContent(testIndex);
+
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repoName, sourceSnapshot);
+
+        blockMasterOnReadIndexMeta(repoName);
+        final ActionFuture<AcknowledgedResponse> cloneFuture =
+                startCloneFromDataNode(repoName, sourceSnapshot, "target-snapshot", testIndex);
+        awaitNumberOfSnapshotsInProgress(1);
+        final String masterNode = internalCluster().getMasterName();
+        waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
+        internalCluster().restartNode(masterNode);
+        boolean cloneSucceeded = false;
+        try {
+            cloneFuture.actionGet(TimeValue.timeValueSeconds(30L));
+            cloneSucceeded = true;
+        } catch (SnapshotException sne) {
+            // ignored, most of the time we will throw here but we could randomly run into a situation where the data node retries the
+            // snapshot on disconnect slowly enough for it to work out
+        }
+
+        awaitNoMoreRunningOperations(internalCluster().getMasterName());
+
+        assertAllSnapshotsSuccessful(getRepositoryData(repoName), cloneSucceeded ? 2 : 1);
+    }
+
+    public void testFailsOnCloneMissingIndices() {
+        internalCluster().startMasterOnlyNode();
+        internalCluster().startDataOnlyNode();
+        final String repoName = "repo-name";
+        final Path repoPath = randomRepoPath();
+        if (randomBoolean()) {
+            createIndexWithContent("test-idx");
+        }
+        createRepository(repoName, "fs", repoPath);
+
+        final String snapshotName = "snapshot";
+        createFullSnapshot(repoName, snapshotName);
+        expectThrows(IndexNotFoundException.class,
+                () -> startClone(repoName, snapshotName, "target-snapshot", "does-not-exist").actionGet());
+    }
+
+    public void testMasterFailoverDuringCloneStep2() throws Exception {
+        // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations
+        internalCluster().startMasterOnlyNodes(3, LARGE_SNAPSHOT_POOL_SETTINGS);
+        internalCluster().startDataOnlyNode();
+        final String repoName = "test-repo";
+        createRepository(repoName, "mock");
+        final String testIndex = "index-test";
+        createIndexWithContent(testIndex);
+
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repoName, sourceSnapshot);
+
+        final String targetSnapshot = "target-snapshot";
+        blockMasterOnShardClone(repoName);
+        final ActionFuture<AcknowledgedResponse> cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex);
+        awaitNumberOfSnapshotsInProgress(1);
+        final String masterNode = internalCluster().getMasterName();
+        waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
+        internalCluster().restartNode(masterNode);
+        expectThrows(SnapshotException.class, cloneFuture::actionGet);
+        awaitNoMoreRunningOperations(internalCluster().getMasterName());
+
+        assertAllSnapshotsSuccessful(getRepositoryData(repoName), 2);
+    }
+
+    public void testExceptionDuringShardClone() throws Exception {
+        // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations
+        internalCluster().startMasterOnlyNodes(3, LARGE_SNAPSHOT_POOL_SETTINGS);
+        internalCluster().startDataOnlyNode();
+        final String repoName = "test-repo";
+        createRepository(repoName, "mock");
+        final String testIndex = "index-test";
+        createIndexWithContent(testIndex);
+
+        final String sourceSnapshot = "source-snapshot";
+        createFullSnapshot(repoName, sourceSnapshot);
+
+        final String targetSnapshot = "target-snapshot";
+        blockMasterFromFinalizingSnapshotOnSnapFile(repoName);
+        final ActionFuture<AcknowledgedResponse> cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex);
+        awaitNumberOfSnapshotsInProgress(1);
+        final String masterNode = internalCluster().getMasterName();
+        waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
+        unblockNode(repoName, masterNode);
+        expectThrows(SnapshotException.class, cloneFuture::actionGet);
+        awaitNoMoreRunningOperations(internalCluster().getMasterName());
+        assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1);
+        assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
+    }
+
+    public void testDoesNotStartOnBrokenSourceSnapshot() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        final String dataNode = internalCluster().startDataOnlyNode();
+        final String repoName = "test-repo";
+        createRepository(repoName, "mock");
+        final String testIndex = "index-test";
+        createIndexWithContent(testIndex);
+
+        final String sourceSnapshot = "source-snapshot";
+        blockDataNode(repoName, dataNode);
+        final Client masterClient = internalCluster().masterClient();
+        final ActionFuture<CreateSnapshotResponse> sourceSnapshotFuture = masterClient.admin().cluster()
+                .prepareCreateSnapshot(repoName, sourceSnapshot).setWaitForCompletion(true).execute();
+        awaitNumberOfSnapshotsInProgress(1);
+        waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
+        internalCluster().restartNode(dataNode);
+        assertThat(sourceSnapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));
+
+        final SnapshotException sne = expectThrows(SnapshotException.class, () -> startClone(masterClient, repoName, sourceSnapshot,
+                "target-snapshot", testIndex).actionGet(TimeValue.timeValueSeconds(30L)));
+        assertThat(sne.getMessage(), containsString("Can't clone index [" + getRepositoryData(repoName).resolveIndexId(testIndex) +
+        "] because its snapshot was not successful."));
+    }
+
+    private ActionFuture<AcknowledgedResponse> startCloneFromDataNode(String repoName, String sourceSnapshot, String targetSnapshot,
+                                                                      String... indices) {
+        return startClone(dataNodeClient(), repoName, sourceSnapshot, targetSnapshot, indices);
+    }
+
+    private ActionFuture<AcknowledgedResponse> startClone(String repoName, String sourceSnapshot, String targetSnapshot,
+                                                          String... indices) {
+        return startClone(client(), repoName, sourceSnapshot, targetSnapshot, indices);
+    }
+
+    private static ActionFuture<AcknowledgedResponse> startClone(Client client, String repoName, String sourceSnapshot,
+                                                                 String targetSnapshot, String... indices) {
+        return client.admin().cluster().prepareCloneSnapshot(repoName, sourceSnapshot, targetSnapshot).setIndices(indices).execute();
+    }
+
+    private void blockMasterOnReadIndexMeta(String repoName) {
+        ((MockRepository)internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName))
+                .setBlockOnReadIndexMeta();
+    }
+
+    private void blockMasterOnShardClone(String repoName) {
+        ((MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName))
+                .setBlockOnWriteShardLevelMeta();
+    }
+
+    /**
+     * Assert that given {@link RepositoryData} contains exactly the given number of snapshots and all of them are successful.
+     */
+    private static void assertAllSnapshotsSuccessful(RepositoryData repositoryData, int successfulSnapshotCount) {
+        final Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
+        assertThat(snapshotIds, hasSize(successfulSnapshotCount));
+        for (SnapshotId snapshotId : snapshotIds) {
+            assertThat(repositoryData.getSnapshotState(snapshotId), is(SnapshotState.SUCCESS));
+        }
+    }
+
     private static BlobStoreIndexShardSnapshots readShardGeneration(BlobStoreRepository repository, RepositoryShardId repositoryShardId,
                                                                     String generation) {
         return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f,

+ 0 - 5
server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java

@@ -1293,11 +1293,6 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
                 .setWaitForCompletion(true).execute();
     }
 
-    // Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough
-    // threads so that blocking some threads on one repository doesn't block other repositories from doing work
-    private static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder()
-        .put("thread_pool.snapshot.core", 5).put("thread_pool.snapshot.max", 5).build();
-
     private void createIndexWithContent(String indexName, String nodeInclude, String nodeExclude) {
         createIndexWithContent(indexName, indexSettingsNoReplicas(1)
                 .put("index.routing.allocation.include._name", nodeInclude)

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java

@@ -72,6 +72,6 @@ public final class TransportCloneSnapshotAction extends TransportMasterNodeActio
     @Override
     protected void masterOperation(Task task, final CloneSnapshotRequest request, ClusterState state,
                                    final ActionListener<AcknowledgedResponse> listener) {
-        throw new UnsupportedOperationException("not implemented yet");
+        snapshotsService.cloneSnapshot(request, ActionListener.map(listener, v -> new AcknowledgedResponse(true)));
     }
 }

+ 123 - 10
server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

@@ -35,8 +35,11 @@ import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.repositories.RepositoryShardId;
 import org.elasticsearch.repositories.RepositoryOperation;
 import org.elasticsearch.snapshots.Snapshot;
+import org.elasticsearch.snapshots.SnapshotId;
+import org.elasticsearch.snapshots.SnapshotsService;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -96,11 +99,32 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
                 indices, dataStreams, startTime, repositoryStateId, shards, null, userMetadata, version);
     }
 
+    /**
+     * Creates the initial snapshot clone entry
+     *
+     * @param snapshot snapshot to clone into
+     * @param source   snapshot to clone from
+     * @param indices  indices to clone
+     * @param startTime start time
+     * @param repositoryStateId repository state id that this clone is based on
+     * @param version repository metadata version to write
+     * @return snapshot clone entry
+     */
+    public static Entry startClone(Snapshot snapshot, SnapshotId source, List<IndexId> indices, long startTime,
+                                   long repositoryStateId, Version version) {
+        return new SnapshotsInProgress.Entry(snapshot, true, false, State.STARTED, indices, Collections.emptyList(),
+                startTime, repositoryStateId, ImmutableOpenMap.of(), null, Collections.emptyMap(), version, source,
+                ImmutableOpenMap.of());
+    }
+
     public static class Entry implements Writeable, ToXContent, RepositoryOperation {
         private final State state;
         private final Snapshot snapshot;
         private final boolean includeGlobalState;
         private final boolean partial;
+        /**
+         * Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation.
+         */
         private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
         private final List<IndexId> indices;
         private final List<String> dataStreams;
@@ -108,6 +132,19 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
         private final long repositoryStateId;
         // see #useShardGenerations
         private final Version version;
+
+        /**
+         * Source snapshot if this is a clone operation or {@code null} if this is a snapshot.
+         */
+        @Nullable
+        private final SnapshotId source;
+
+        /**
+         * Map of {@link RepositoryShardId} to {@link ShardSnapshotStatus} tracking the state of each shard clone operation in this entry
+         * the same way {@link #shards} tracks the status of each shard snapshot operation in non-clone entries.
+         */
+        private final ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones;
+
         @Nullable private final Map<String, Object> userMetadata;
         @Nullable private final String failure;
 
@@ -116,6 +153,15 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
                      List<String> dataStreams, long startTime, long repositoryStateId,
                      ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure, Map<String, Object> userMetadata,
                      Version version) {
+            this(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards, failure,
+                    userMetadata, version, null, ImmutableOpenMap.of());
+        }
+
+        private Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
+                     List<String> dataStreams, long startTime, long repositoryStateId,
+                     ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure, Map<String, Object> userMetadata,
+                     Version version, @Nullable SnapshotId source,
+                     @Nullable ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
             this.state = state;
             this.snapshot = snapshot;
             this.includeGlobalState = includeGlobalState;
@@ -124,11 +170,18 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             this.dataStreams = dataStreams;
             this.startTime = startTime;
             this.shards = shards;
-            assert assertShardsConsistent(state, indices, shards);
             this.repositoryStateId = repositoryStateId;
             this.failure = failure;
             this.userMetadata = userMetadata;
             this.version = version;
+            this.source = source;
+            if (source == null) {
+                assert clones == null || clones.isEmpty() : "Provided [" + clones + "] but no source";
+                this.clones = ImmutableOpenMap.of();
+            } else {
+                this.clones = clones;
+            }
+            assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
         }
 
         private Entry(StreamInput in) throws IOException {
@@ -144,21 +197,41 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             userMetadata = in.readMap();
             version = Version.readVersion(in);
             dataStreams = in.readStringList();
+            if (in.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) {
+                source = in.readOptionalWriteable(SnapshotId::new);
+                clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom);
+            } else {
+                source = null;
+                clones = ImmutableOpenMap.of();
+            }
         }
 
-        private static boolean assertShardsConsistent(State state, List<IndexId> indices,
-                                                      ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
+        private static boolean assertShardsConsistent(SnapshotId source, State state, List<IndexId> indices,
+                                                      ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
+                                                      ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
             if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) {
                 return true;
             }
             final Set<String> indexNames = indices.stream().map(IndexId::getName).collect(Collectors.toSet());
             final Set<String> indexNamesInShards = new HashSet<>();
-            shards.keysIt().forEachRemaining(s -> indexNamesInShards.add(s.getIndexName()));
-            assert indexNames.equals(indexNamesInShards)
+            shards.iterator().forEachRemaining(s -> {
+                indexNamesInShards.add(s.key.getIndexName());
+                assert source == null || s.value.nodeId == null :
+                        "Shard snapshot must not be assigned to data node when copying from snapshot [" + source + "]";
+            });
+            assert source == null || indexNames.isEmpty() == false : "No empty snapshot clones allowed";
+            assert source != null || indexNames.equals(indexNamesInShards)
                 : "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
-            final boolean shardsCompleted = completed(shards.values());
-            assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
-                : "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
+            final boolean shardsCompleted = completed(shards.values()) && completed(clones.values());
+            // Check state consistency for normal snapshots and started clone operations
+            if (source == null || clones.isEmpty() == false) {
+                assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
+                        : "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
+            }
+            if (source != null && state.completed()) {
+                assert hasFailures(clones) == false || state == State.FAILED
+                        : "Failed shard clones in [" + clones + "] but state was [" + state + "]";
+            }
             return true;
         }
 
@@ -166,7 +239,17 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             assert newRepoGen > repositoryStateId : "Updated repository generation [" + newRepoGen
                     + "] must be higher than current generation [" + repositoryStateId + "]";
             return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, newRepoGen, shards, failure,
-                    userMetadata, version);
+                    userMetadata, version, source, clones);
+        }
+
+        public Entry withClones(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> updatedClones) {
+            if (updatedClones.equals(clones)) {
+                return this;
+            }
+            return new Entry(snapshot, includeGlobalState, partial,
+                    completed(updatedClones.values()) ? (hasFailures(updatedClones) ? State.FAILED : State.SUCCESS) :
+                            state, indices, dataStreams, startTime, repositoryStateId, shards, failure, userMetadata, version, source,
+                    updatedClones);
         }
 
         /**
@@ -203,7 +286,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
 
         public Entry fail(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, State state, String failure) {
             return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards,
-                    failure, userMetadata, version);
+                    failure, userMetadata, version, source, clones);
         }
 
         /**
@@ -291,6 +374,19 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             return version;
         }
 
+        @Nullable
+        public SnapshotId source() {
+            return source;
+        }
+
+        public boolean isClone() {
+            return source != null;
+        }
+
+        public ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones() {
+            return clones;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
@@ -307,6 +403,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             if (state != entry.state) return false;
             if (repositoryStateId != entry.repositoryStateId) return false;
             if (version.equals(entry.version) == false) return false;
+            if (Objects.equals(source, ((Entry) o).source) == false) return false;
+            if (clones.equals(((Entry) o).clones) == false) return false;
 
             return true;
         }
@@ -322,6 +420,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             result = 31 * result + Long.hashCode(startTime);
             result = 31 * result + Long.hashCode(repositoryStateId);
             result = 31 * result + version.hashCode();
+            result = 31 * result + (source == null ? 0 : source.hashCode());
+            result = 31 * result + clones.hashCode();
             return result;
         }
 
@@ -383,6 +483,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             out.writeMap(userMetadata);
             Version.writeVersion(version, out);
             out.writeStringCollection(dataStreams);
+            if (out.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) {
+                out.writeOptionalWriteable(source);
+                out.writeMap(clones);
+            }
         }
 
         @Override
@@ -406,6 +510,15 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
         return true;
     }
 
+    private static boolean hasFailures(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
+        for (ObjectCursor<ShardSnapshotStatus> value : clones.values()) {
+            if (value.value.state().failed()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public static class ShardSnapshotStatus implements Writeable {
 
         /**

+ 1 - 1
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -915,7 +915,7 @@ public class RestoreService implements ClusterStateApplier {
         }
     }
 
-    private static boolean failed(SnapshotInfo snapshot, String index) {
+    public static boolean failed(SnapshotInfo snapshot, String index) {
         for (SnapshotShardFailure failure : snapshot.shardFailures()) {
             if (index.equals(failure.index())) {
                 return true;

+ 4 - 0
server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

@@ -197,6 +197,10 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
         final String localNodeId = clusterService.localNode().getId();
         for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
             final State entryState = entry.state();
+            if (entry.isClone()) {
+                // This is a snapshot clone, it will be executed on the current master
+                continue;
+            }
             if (entryState == State.STARTED) {
                 Map<ShardId, IndexShardSnapshotStatus> startedShards = null;
                 final Snapshot snapshot = entry.snapshot();

+ 636 - 100
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -27,9 +27,13 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
+import org.elasticsearch.action.StepListener;
+import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
@@ -43,6 +47,7 @@ import org.elasticsearch.cluster.RepositoryCleanupInProgress;
 import org.elasticsearch.cluster.RestoreInProgress;
 import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
 import org.elasticsearch.cluster.SnapshotsInProgress;
+import org.elasticsearch.repositories.RepositoryShardId;
 import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
 import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
 import org.elasticsearch.cluster.SnapshotsInProgress.State;
@@ -104,6 +109,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -120,6 +127,8 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
  */
 public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier {
 
+    public static final Version CLONE_SNAPSHOT_VERSION = Version.V_8_0_0;
+
     public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0;
 
     public static final Version INDEX_GEN_IN_REPO_DATA_VERSION = Version.V_7_9_0;
@@ -152,6 +161,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
     // Set of snapshots that are currently being ended by this node
     private final Set<Snapshot> endingSnapshots = Collections.synchronizedSet(new HashSet<>());
 
+    // Set of currently initializing clone operations
+    private final Set<Snapshot> initializingClones = Collections.synchronizedSet(new HashSet<>());
+
     private final UpdateSnapshotStatusAction updateSnapshotStatusHandler;
 
     private final TransportService transportService;
@@ -228,29 +240,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 
             @Override
             public ClusterState execute(ClusterState currentState) {
-                // check if the snapshot name already exists in the repository
-                if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
-                    throw new InvalidSnapshotNameException(
-                            repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
-                }
+                ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
                 final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
                 final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots.entries();
-                if (runningSnapshots.stream().anyMatch(s -> {
-                    final Snapshot running = s.snapshot();
-                    return running.getRepository().equals(repositoryName) && running.getSnapshotId().getName().equals(snapshotName);
-                })) {
-                    throw new InvalidSnapshotNameException(
-                            repository.getMetadata().name(), snapshotName, "snapshot with the same name is already in-progress");
-                }
+                ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName);
                 validate(repositoryName, snapshotName, currentState);
                 final SnapshotDeletionsInProgress deletionsInProgress =
                         currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
-                final RepositoryCleanupInProgress repositoryCleanupInProgress =
-                        currentState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY);
-                if (repositoryCleanupInProgress.hasCleanupInProgress()) {
-                    throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
-                        "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
-                }
+                ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
                 ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
                 // Store newSnapshot here to be processed in clusterStateProcessed
                 List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));
@@ -261,9 +258,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
 
                 final List<IndexId> indexIds = repositoryData.resolveNewIndices(
-                        indices, runningSnapshots.stream().filter(entry -> entry.repository().equals(repositoryName))
-                                .flatMap(entry -> entry.indices().stream()).distinct()
-                                .collect(Collectors.toMap(IndexId::getName, Function.identity())));
+                        indices, getInFlightIndexIds(runningSnapshots, repositoryName));
                 final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null);
                 ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(snapshots, deletionsInProgress, currentState.metadata(),
                     currentState.routingTable(), indexIds, useShardGenerations(version), repositoryData, repositoryName);
@@ -313,6 +308,276 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         }, "create_snapshot [" + snapshotName + ']', listener::onFailure);
     }
 
+    private static void ensureSnapshotNameNotRunning(List<SnapshotsInProgress.Entry> runningSnapshots, String repositoryName,
+                                                     String snapshotName) {
+        if (runningSnapshots.stream().anyMatch(s -> {
+            final Snapshot running = s.snapshot();
+            return running.getRepository().equals(repositoryName) && running.getSnapshotId().getName().equals(snapshotName);
+        })) {
+            throw new InvalidSnapshotNameException(repositoryName, snapshotName, "snapshot with the same name is already in-progress");
+        }
+    }
+
+    private static Map<String, IndexId> getInFlightIndexIds(List<SnapshotsInProgress.Entry> runningSnapshots, String repositoryName) {
+        return runningSnapshots.stream().filter(entry -> entry.repository().equals(repositoryName))
+                .flatMap(entry -> entry.indices().stream()).distinct()
+                .collect(Collectors.toMap(IndexId::getName, Function.identity()));
+    }
+
+    public void cloneSnapshot(CloneSnapshotRequest request, ActionListener<Void> listener) {
+        final String repositoryName = request.repository();
+        Repository repository = repositoriesService.repository(repositoryName);
+        if (repository.isReadOnly()) {
+            listener.onFailure(new RepositoryException(repositoryName, "cannot create snapshot in a readonly repository"));
+            return;
+        }
+        final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.target());
+        validate(repositoryName, snapshotName);
+        final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID());
+        final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
+        initializingClones.add(snapshot);
+        repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask() {
+
+            private SnapshotsInProgress.Entry newEntry;
+
+            @Override
+            public ClusterState execute(ClusterState currentState) {
+                ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
+                ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
+                final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
+                final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots.entries();
+                ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName);
+                validate(repositoryName, snapshotName, currentState);
+
+                final SnapshotId sourceSnapshotId = repositoryData.getSnapshotIds()
+                        .stream()
+                        .filter(src -> src.getName().equals(request.source()))
+                        .findAny()
+                        .orElseThrow(() -> new SnapshotMissingException(repositoryName, request.source()));
+                final SnapshotDeletionsInProgress deletionsInProgress =
+                        currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
+                if (deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(sourceSnapshotId))) {
+                    throw new ConcurrentSnapshotExecutionException(repositoryName, sourceSnapshotId.getName(),
+                            "cannot clone from snapshot that is being deleted");
+                }
+                ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
+                final List<String> indicesForSnapshot = new ArrayList<>();
+                for (IndexId indexId : repositoryData.getIndices().values()) {
+                    if (repositoryData.getSnapshots(indexId).contains(sourceSnapshotId)) {
+                        indicesForSnapshot.add(indexId.getName());
+                    }
+                }
+                final List<String> matchingIndices =
+                        SnapshotUtils.filterIndices(indicesForSnapshot, request.indices(), request.indicesOptions());
+                if (matchingIndices.isEmpty()) {
+                    throw new SnapshotException(new Snapshot(repositoryName, sourceSnapshotId),
+                            "No indices in the source snapshot [" + sourceSnapshotId + "] matched requested pattern ["
+                                    + Strings.arrayToCommaDelimitedString(request.indices()) + "]");
+                }
+                newEntry = SnapshotsInProgress.startClone(
+                        snapshot, sourceSnapshotId,
+                        repositoryData.resolveIndices(matchingIndices),
+                        threadPool.absoluteTimeInMillis(), repositoryData.getGenId(),
+                        minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null));
+                final List<SnapshotsInProgress.Entry> newEntries = new ArrayList<>(runningSnapshots);
+                newEntries.add(newEntry);
+                return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
+                        SnapshotsInProgress.of(List.copyOf(newEntries))).build();
+            }
+
+            @Override
+            public void onFailure(String source, Exception e) {
+                initializingClones.remove(snapshot);
+                logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to clone snapshot", repositoryName, snapshotName), e);
+                listener.onFailure(e);
+            }
+
+            @Override
+            public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
+                logger.info("snapshot clone [{}] started", snapshot);
+                addListener(snapshot, ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure));
+                startCloning(repository, newEntry);
+            }
+
+            @Override
+            public TimeValue timeout() {
+                initializingClones.remove(snapshot);
+                return request.masterNodeTimeout();
+            }
+        }, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
+    }
+
+    private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) {
+        final RepositoryCleanupInProgress repositoryCleanupInProgress =
+                currentState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY);
+        if (repositoryCleanupInProgress.hasCleanupInProgress()) {
+            throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
+                    "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
+        }
+    }
+
+    private static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryData, String snapshotName, Repository repository) {
+        // check if the snapshot name already exists in the repository
+        if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
+            throw new InvalidSnapshotNameException(
+                    repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
+        }
+    }
+
+    /**
+     * Determine the number of shards in each index of a clone operation and update the cluster state accordingly.
+     *
+     * @param repository     repository to run operation on
+     * @param cloneEntry     clone operation in the cluster state
+     */
+    private void startCloning(Repository repository, SnapshotsInProgress.Entry cloneEntry) {
+        final List<IndexId> indices = cloneEntry.indices();
+        final SnapshotId sourceSnapshot = cloneEntry.source();
+        final Snapshot targetSnapshot = cloneEntry.snapshot();
+
+        final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
+        // Exception handler for IO exceptions with loading index and repo metadata
+        final Consumer<Exception> onFailure = e -> {
+            initializingClones.remove(targetSnapshot);
+            logger.info(() -> new ParameterizedMessage("Failed to start snapshot clone [{}]", cloneEntry), e);
+            removeFailedSnapshotFromClusterState(targetSnapshot, e, null);
+        };
+
+        // 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
+        // TODO: we could skip this step for snapshots with state SUCCESS
+        final StepListener<SnapshotInfo> snapshotInfoListener = new StepListener<>();
+        executor.execute(ActionRunnable.supply(snapshotInfoListener, () -> repository.getSnapshotInfo(sourceSnapshot)));
+
+        final StepListener<Collection<Tuple<IndexId, Integer>>> allShardCountsListener = new StepListener<>();
+        final GroupedActionListener<Tuple<IndexId, Integer>> shardCountListener =
+                new GroupedActionListener<>(allShardCountsListener, indices.size());
+        snapshotInfoListener.whenComplete(snapshotInfo -> {
+            for (IndexId indexId : indices) {
+                if (RestoreService.failed(snapshotInfo, indexId.getName())) {
+                    throw new SnapshotException(targetSnapshot, "Can't clone index [" + indexId +
+                            "] because its snapshot was not successful.");
+                }
+            }
+            // 2. step, load the number of shards we have in each index to be cloned from the index metadata.
+            repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
+                for (IndexId index : indices) {
+                    executor.execute(ActionRunnable.supply(shardCountListener, () -> {
+                        final IndexMetadata metadata = repository.getSnapshotIndexMetaData(repositoryData, sourceSnapshot, index);
+                        return Tuple.tuple(index, metadata.getNumberOfShards());
+                    }));
+                }
+            }, onFailure));
+        }, onFailure);
+
+        // 3. step, we have all the shard counts, now update the cluster state to have clone jobs in the snap entry
+        allShardCountsListener.whenComplete(counts -> repository.executeConsistentStateUpdate(repoData -> new ClusterStateUpdateTask() {
+
+            private SnapshotsInProgress.Entry updatedEntry;
+
+            @Override
+            public ClusterState execute(ClusterState currentState) {
+                final SnapshotsInProgress snapshotsInProgress =
+                        currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
+                final List<SnapshotsInProgress.Entry> updatedEntries = new ArrayList<>(snapshotsInProgress.entries());
+                boolean changed = false;
+                final String localNodeId = currentState.nodes().getLocalNodeId();
+                final String repoName = cloneEntry.repository();
+                final Map<String, IndexId> indexIds = getInFlightIndexIds(updatedEntries, repoName);
+                final ShardGenerations shardGenerations = repoData.shardGenerations();
+                for (int i = 0; i < updatedEntries.size(); i++) {
+                    if (cloneEntry.equals(updatedEntries.get(i))) {
+                        final ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clonesBuilder =
+                                ImmutableOpenMap.builder();
+                        // TODO: could be optimized by just dealing with repo shard id directly
+                        final Set<RepositoryShardId> busyShardsInRepo =
+                                busyShardsForRepo(repoName, snapshotsInProgress, currentState.metadata())
+                                        .stream()
+                                        .map(shardId -> new RepositoryShardId(indexIds.get(shardId.getIndexName()), shardId.getId()))
+                                        .collect(Collectors.toSet());
+                        for (Tuple<IndexId, Integer> count : counts) {
+                            for (int shardId = 0; shardId < count.v2(); shardId++) {
+                                final RepositoryShardId repoShardId = new RepositoryShardId(count.v1(), shardId);
+                                if (busyShardsInRepo.contains(repoShardId)) {
+                                    clonesBuilder.put(repoShardId, ShardSnapshotStatus.UNASSIGNED_QUEUED);
+                                } else {
+                                    clonesBuilder.put(repoShardId,
+                                            new ShardSnapshotStatus(localNodeId, shardGenerations.getShardGen(count.v1(), shardId)));
+                                }
+                            }
+                        }
+                        updatedEntry = cloneEntry.withClones(clonesBuilder.build());
+                        updatedEntries.set(i, updatedEntry);
+                        changed = true;
+                        break;
+                    }
+                }
+                return updateWithSnapshots(currentState, changed ? SnapshotsInProgress.of(updatedEntries) : null, null);
+            }
+
+            @Override
+            public void onFailure(String source, Exception e) {
+                initializingClones.remove(targetSnapshot);
+                logger.info(() -> new ParameterizedMessage("Failed to start snapshot clone [{}]", cloneEntry), e);
+                failAllListenersOnMasterFailOver(e);
+            }
+
+            @Override
+            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                initializingClones.remove(targetSnapshot);
+                if (updatedEntry != null) {
+                    final Snapshot target = updatedEntry.snapshot();
+                    final SnapshotId sourceSnapshot = updatedEntry.source();
+                    for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> indexClone : updatedEntry.clones()) {
+                        final ShardSnapshotStatus shardStatusBefore = indexClone.value;
+                        if (shardStatusBefore.state() != ShardState.INIT) {
+                            continue;
+                        }
+                        final RepositoryShardId repoShardId = indexClone.key;
+                        runReadyClone(target, sourceSnapshot, shardStatusBefore, repoShardId, repository);
+                    }
+                } else {
+                    // Extremely unlikely corner case of master failing over between between starting the clone and
+                    // starting shard clones.
+                    logger.warn("Did not find expected entry [{}] in the cluster state", cloneEntry);
+                }
+            }
+        }, "start snapshot clone", onFailure), onFailure);
+    }
+
+    private final Set<RepositoryShardId> currentlyCloning = Collections.synchronizedSet(new HashSet<>());
+
+    private void runReadyClone(Snapshot target, SnapshotId sourceSnapshot, ShardSnapshotStatus shardStatusBefore,
+                               RepositoryShardId repoShardId, Repository repository) {
+        final SnapshotId targetSnapshot = target.getSnapshotId();
+        final String localNodeId = clusterService.localNode().getId();
+        if (currentlyCloning.add(repoShardId)) {
+            repository.cloneShardSnapshot(sourceSnapshot, targetSnapshot, repoShardId, shardStatusBefore.generation(), ActionListener.wrap(
+                    generation -> innerUpdateSnapshotState(
+                            new ShardSnapshotUpdate(target, repoShardId,
+                                    new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)),
+                            ActionListener.runBefore(
+                                    ActionListener.wrap(
+                                            v -> logger.trace("Marked [{}] as successfully cloned from [{}] to [{}]", repoShardId,
+                                                    sourceSnapshot, targetSnapshot),
+                                            e -> {
+                                                logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId);
+                                                failAllListenersOnMasterFailOver(e);
+                                            }
+                                    ), () -> currentlyCloning.remove(repoShardId))
+                    ), e -> innerUpdateSnapshotState(
+                            new ShardSnapshotUpdate(target, repoShardId,
+                                    new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null)),
+                            ActionListener.runBefore(ActionListener.wrap(
+                                    v -> logger.trace("Marked [{}] as failed clone from [{}] to [{}]", repoShardId,
+                                            sourceSnapshot, targetSnapshot),
+                                    ex -> {
+                                        logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId);
+                                        failAllListenersOnMasterFailOver(ex);
+                                    }
+                            ), () -> currentlyCloning.remove(repoShardId)))));
+        }
+    }
+
     private void ensureBelowConcurrencyLimit(String repository, String name, SnapshotsInProgress snapshotsInProgress,
                                              SnapshotDeletionsInProgress deletionsInProgress) {
         final int inProgressOperations = snapshotsInProgress.entries().size() + deletionsInProgress.getEntries().size();
@@ -369,17 +634,24 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         ShardGenerations.Builder builder = ShardGenerations.builder();
         final Map<String, IndexId> indexLookup = new HashMap<>();
         snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
-        snapshot.shards().forEach(c -> {
-            if (metadata.index(c.key.getIndex()) == null) {
-                assert snapshot.partial() :
-                    "Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
-                return;
-            }
-            final IndexId indexId = indexLookup.get(c.key.getIndexName());
-            if (indexId != null) {
-                builder.put(indexId, c.key.id(), c.value.generation());
-            }
-        });
+        if (snapshot.isClone()) {
+            snapshot.clones().forEach(c -> {
+                final IndexId indexId = indexLookup.get(c.key.indexName());
+                builder.put(indexId, c.key.shardId(), c.value.generation());
+            });
+        } else {
+            snapshot.shards().forEach(c -> {
+                if (metadata.index(c.key.getIndex()) == null) {
+                    assert snapshot.partial() :
+                            "Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
+                    return;
+                }
+                final IndexId indexId = indexLookup.get(c.key.getIndexName());
+                if (indexId != null) {
+                    builder.put(indexId, c.key.id(), c.value.generation());
+                }
+            });
+        }
         return builder.build();
     }
 
@@ -594,17 +866,27 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 
                 for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
                     if (statesToUpdate.contains(snapshot.state())) {
-                        ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = processWaitingShardsAndRemovedNodes(snapshot.shards(),
-                                routingTable, nodes, knownFailures.computeIfAbsent(snapshot.repository(), k -> new HashMap<>()));
-                        if (shards != null) {
-                            final SnapshotsInProgress.Entry updatedSnapshot = snapshot.withShardStates(shards);
-                            changed = true;
-                            if (updatedSnapshot.state().completed()) {
-                                finishedSnapshots.add(updatedSnapshot);
+                        // Currently initializing clone
+                        if (snapshot.isClone() && snapshot.clones().isEmpty()) {
+                            if (initializingClones.contains(snapshot.snapshot())) {
+                                updatedSnapshotEntries.add(snapshot);
+                            } else {
+                                logger.debug("removing not yet start clone operation [{}]", snapshot);
+                                changed = true;
                             }
-                            updatedSnapshotEntries.add(updatedSnapshot);
                         } else {
-                            updatedSnapshotEntries.add(snapshot);
+                            ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = processWaitingShardsAndRemovedNodes(snapshot.shards(),
+                                    routingTable, nodes, knownFailures.computeIfAbsent(snapshot.repository(), k -> new HashMap<>()));
+                            if (shards != null) {
+                                final SnapshotsInProgress.Entry updatedSnapshot = snapshot.withShardStates(shards);
+                                changed = true;
+                                if (updatedSnapshot.state().completed()) {
+                                    finishedSnapshots.add(updatedSnapshot);
+                                }
+                                updatedSnapshotEntries.add(updatedSnapshot);
+                            } else {
+                                updatedSnapshotEntries.add(snapshot);
+                            }
                         }
                     } else if (snapshot.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) {
                         // BwC path, older versions could create entries with unknown repo GEN in INIT or ABORTED state that did not yet
@@ -656,6 +938,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                         }
                     }
                 }
+                startExecutableClones(newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY), null);
                 // run newly ready deletes
                 for (SnapshotDeletionsInProgress.Entry entry : deletionsToExecute) {
                     if (tryEnterRepoLoop(entry.repository())) {
@@ -791,6 +1074,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * @param entry snapshot
      */
     private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nullable RepositoryData repositoryData) {
+        if (entry.isClone() && entry.state() == State.FAILED) {
+            logger.debug("Removing failed snapshot clone [{}] from cluster state", entry);
+            removeFailedSnapshotFromClusterState(entry.snapshot(), new SnapshotException(entry.snapshot(), entry.failure()), null);
+            return;
+        }
         final boolean newFinalization = endingSnapshots.add(entry.snapshot());
         final String repoName = entry.repository();
         if (tryEnterRepoLoop(repoName)) {
@@ -865,10 +1153,24 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 entry.startTime(), failure, threadPool.absoluteTimeInMillis(),
                 entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), shardFailures,
                 entry.includeGlobalState(), entry.userMetadata());
-            repositoriesService.repository(snapshot.getRepository()).finalizeSnapshot(
+            final StepListener<Metadata> metadataListener = new StepListener<>();
+            final Repository repo = repositoriesService.repository(snapshot.getRepository());
+            if (entry.isClone()) {
+                threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(
+                        ActionRunnable.supply(metadataListener, () -> {
+                            final Metadata.Builder metaBuilder = Metadata.builder(repo.getSnapshotGlobalMetadata(entry.source()));
+                            for (IndexId index : entry.indices()) {
+                                metaBuilder.put(repo.getSnapshotIndexMetaData(repositoryData, entry.source(), index), false);
+                            }
+                            return metaBuilder.build();
+                        }));
+            } else {
+                metadataListener.onResponse(metadata);
+            }
+            metadataListener.whenComplete(meta -> repo.finalizeSnapshot(
                     shardGenerations,
-            repositoryData.getGenId(),
-                    metadataForSnapshot(entry, metadata),
+                    repositoryData.getGenId(),
+                    metadataForSnapshot(entry, meta),
                     snapshotInfo,
                     entry.version(),
                     state -> stateWithoutSnapshot(state, snapshot),
@@ -878,7 +1180,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                             snapshotCompletionListeners.remove(snapshot), Tuple.tuple(newRepoData, snapshotInfo));
                         logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
                         runNextQueuedOperation(newRepoData, repository, true);
-                    }, e -> handleFinalizationFailure(e, entry, repositoryData)));
+                    }, e -> handleFinalizationFailure(e, entry, repositoryData))),
+                    e -> handleFinalizationFailure(e, entry, repositoryData));
         } catch (Exception e) {
             assert false : new AssertionError(e);
             handleFinalizationFailure(e, entry, repositoryData);
@@ -1046,10 +1349,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * used when the snapshot fails for some reason. During normal operation the snapshot repository will remove the
      * {@link SnapshotsInProgress.Entry} from the cluster state once it's done finalizing the snapshot.
      *
-     * @param snapshot snapshot that failed
-     * @param failure  exception that failed the snapshot
+     * @param snapshot       snapshot that failed
+     * @param failure        exception that failed the snapshot
+     * @param repositoryData repository data if the next finalization operation on the repository should be attempted or {@code null} if
+     *                       no further actions should be executed
      */
-    private void removeFailedSnapshotFromClusterState(Snapshot snapshot, Exception failure, RepositoryData repositoryData) {
+    private void removeFailedSnapshotFromClusterState(Snapshot snapshot, Exception failure, @Nullable RepositoryData repositoryData) {
         assert failure != null : "Failure must be supplied";
         clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
 
@@ -1080,7 +1385,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             @Override
             public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                 failSnapshotCompletionListeners(snapshot, failure);
-                runNextQueuedOperation(repositoryData, snapshot.getRepository(), true);
+                if (repositoryData != null) {
+                    runNextQueuedOperation(repositoryData, snapshot.getRepository(), true);
+                }
             }
         });
     }
@@ -1159,6 +1466,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 if (snapshotIds.isEmpty()) {
                     return currentState;
                 }
+                final Set<SnapshotId> activeCloneSources = snapshots.entries()
+                        .stream()
+                        .filter(SnapshotsInProgress.Entry::isClone)
+                        .map(SnapshotsInProgress.Entry::source)
+                        .collect(Collectors.toSet());
+                for (SnapshotId snapshotId : snapshotIds) {
+                    if (activeCloneSources.contains(snapshotId)) {
+                        throw new ConcurrentSnapshotExecutionException(new Snapshot(repoName, snapshotId),
+                                "cannot delete snapshot while it is being cloned");
+                    }
+                }
                 final SnapshotDeletionsInProgress deletionsInProgress =
                         currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
                 final RepositoryCleanupInProgress repositoryCleanupInProgress =
@@ -1749,7 +2067,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             RepositoryData repositoryData, String repoName) {
         ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
         final ShardGenerations shardGenerations = repositoryData.shardGenerations();
-        final Set<ShardId> inProgressShards = busyShardsForRepo(repoName, snapshotsInProgress);
+        final Set<ShardId> inProgressShards = busyShardsForRepo(repoName, snapshotsInProgress, metadata);
         final boolean readyToExecute = deletionsInProgress == null || deletionsInProgress.getEntries().stream()
             .noneMatch(entry -> entry.repository().equals(repoName) && entry.state() == SnapshotDeletionsInProgress.State.STARTED);
         for (IndexId index : indices) {
@@ -1809,16 +2127,32 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * @param snapshots    snapshots in progress
      * @return shard ids that currently have an actively executing shard snapshot on a data node
      */
-    private static Set<ShardId> busyShardsForRepo(String repoName, @Nullable SnapshotsInProgress snapshots) {
+    private static Set<ShardId> busyShardsForRepo(String repoName, @Nullable SnapshotsInProgress snapshots, Metadata metadata) {
         final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots == null ? List.of() : snapshots.entries();
         final Set<ShardId> inProgressShards = new HashSet<>();
         for (SnapshotsInProgress.Entry runningSnapshot : runningSnapshots) {
             if (runningSnapshot.repository().equals(repoName) == false) {
                 continue;
             }
-            for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : runningSnapshot.shards()) {
-                if (shard.value.isActive()) {
-                    inProgressShards.add(shard.key);
+            if (runningSnapshot.isClone()) {
+                for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> clone : runningSnapshot.clones()) {
+                    final ShardSnapshotStatus shardState = clone.value;
+                        if (shardState.isActive()) {
+                            IndexMetadata indexMeta = metadata.index(clone.key.indexName());
+                            final Index index;
+                            if (indexMeta == null) {
+                                index = new Index(clone.key.indexName(), IndexMetadata.INDEX_UUID_NA_VALUE);
+                            } else {
+                                index = indexMeta.getIndex();
+                            }
+                            inProgressShards.add(new ShardId(index, clone.key.shardId()));
+                        }
+                }
+            } else {
+                for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : runningSnapshot.shards()) {
+                    if (shard.value.isActive()) {
+                        inProgressShards.add(shard.key);
+                    }
                 }
             }
         }
@@ -1911,97 +2245,282 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         return true;
     }
 
-    private static final ClusterStateTaskExecutor<ShardSnapshotUpdate> SHARD_STATE_EXECUTOR = (currentState, tasks) -> {
+    /**
+     * Executor that applies {@link ShardSnapshotUpdate}s to the current cluster state. The algorithm implemented below works as described
+     * below:
+     * Every shard snapshot or clone state update can result in multiple snapshots being updated. In order to determine whether or not a
+     * shard update has an effect we use an outer loop over all current executing snapshot operations that iterates over them in the order
+     * they were started in and an inner loop over the list of shard update tasks.
+     *
+     * If the inner loop finds that a shard update task applies to a given snapshot and either a shard-snapshot or shard-clone operation in
+     * it then it will update the state of the snapshot entry accordingly. If that update was a noop, then the task is removed from the
+     * iteration as it was already applied before and likely just arrived on the master node again due to retries upstream.
+     * If the update was not a noop, then it means that the shard it applied to is now available for another snapshot or clone operation
+     * to be re-assigned if there is another snapshot operation that is waiting for the shard to become available. We therefore record the
+     * fact that a task was executed by adding it to a collection of executed tasks. If a subsequent execution of the outer loop finds that
+     * a task in the executed tasks collection applied to a shard it was waiting for to become available, then the shard snapshot operation
+     * will be started for that snapshot entry and the task removed from the collection of tasks that need to be applied to snapshot
+     * entries since it can not have any further effects.
+     *
+     * Package private to allow for tests.
+     */
+    static final ClusterStateTaskExecutor<ShardSnapshotUpdate> SHARD_STATE_EXECUTOR = (currentState, tasks) -> {
         int changedCount = 0;
         int startedCount = 0;
         final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
+        final String localNodeId = currentState.nodes().getLocalNodeId();
         // Tasks to check for updates for running snapshots.
         final List<ShardSnapshotUpdate> unconsumedTasks = new ArrayList<>(tasks);
         // Tasks that were used to complete an existing in-progress shard snapshot
         final Set<ShardSnapshotUpdate> executedTasks = new HashSet<>();
+        // Outer loop over all snapshot entries in the order they were created in
         for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
             if (entry.state().completed()) {
+                // completed snapshots do not require any updates so we just add them to the new list and keep going
                 entries.add(entry);
                 continue;
             }
             ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = null;
+            ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clones = null;
+            Map<String, IndexId> indicesLookup = null;
+            // inner loop over all the shard updates that are potentially applicable to the current snapshot entry
             for (Iterator<ShardSnapshotUpdate> iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
                 final ShardSnapshotUpdate updateSnapshotState = iterator.next();
                 final Snapshot updatedSnapshot = updateSnapshotState.snapshot;
                 final String updatedRepository = updatedSnapshot.getRepository();
                 if (entry.repository().equals(updatedRepository) == false) {
+                    // the update applies to a different repository so it is irrelevant here
                     continue;
                 }
-                final ShardId finishedShardId = updateSnapshotState.shardId;
-                if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
-                    final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
-                    if (existing == null) {
-                        logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
-                                updateSnapshotState, entry);
-                        assert false : "This should never happen, data nodes should only send updates for expected shards";
-                        continue;
-                    }
-                    if (existing.state().completed()) {
-                        // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
-                        iterator.remove();
-                        continue;
-                    }
-                    logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
-                            finishedShardId, updateSnapshotState.updatedState.state());
-                    if (shards == null) {
-                        shards = ImmutableOpenMap.builder(entry.shards());
-                    }
-                    shards.put(finishedShardId, updateSnapshotState.updatedState);
-                    executedTasks.add(updateSnapshotState);
-                    changedCount++;
-                } else if (executedTasks.contains(updateSnapshotState)) {
-                    // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
-                    final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
-                    if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
-                        continue;
+                if (updateSnapshotState.isClone()) {
+                    // The update applied to a shard clone operation
+                    final RepositoryShardId finishedShardId = updateSnapshotState.repoShardId;
+                    if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
+                        assert entry.isClone() : "Non-clone snapshot [" + entry + "] received update for clone ["
+                                + updateSnapshotState + "]";
+                        final ShardSnapshotStatus existing = entry.clones().get(finishedShardId);
+                        if (existing == null) {
+                            logger.warn("Received clone shard snapshot status update [{}] but this shard is not tracked in [{}]",
+                                    updateSnapshotState, entry);
+                            assert false : "This should never happen, master will not submit a state update for a non-existing clone";
+                            continue;
+                        }
+                        if (existing.state().completed()) {
+                            // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
+                            iterator.remove();
+                            continue;
+                        }
+                        logger.trace("[{}] Updating shard clone [{}] with status [{}]", updatedSnapshot,
+                                finishedShardId, updateSnapshotState.updatedState.state());
+                        if (clones == null) {
+                            clones = ImmutableOpenMap.builder(entry.clones());
+                        }
+                        changedCount++;
+                        clones.put(finishedShardId, updateSnapshotState.updatedState);
+                        executedTasks.add(updateSnapshotState);
+                    } else if (executedTasks.contains(updateSnapshotState)) {
+                        // the update was already executed on the clone operation it applied to, now we check if it may be possible to
+                        // start a shard snapshot or clone operation on the current entry
+                        if (entry.isClone()) {
+                            // current entry is a clone operation
+                            final ShardSnapshotStatus existingStatus = entry.clones().get(finishedShardId);
+                            if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+                                continue;
+                            }
+                            if (clones == null) {
+                                clones = ImmutableOpenMap.builder(entry.clones());
+                            }
+                            final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+                            logger.trace("Starting clone [{}] on [{}] with generation [{}]", finishedShardId,
+                                    finishedStatus.nodeId(), finishedStatus.generation());
+                            assert finishedStatus.nodeId().equals(localNodeId) : "Clone updated with node id [" + finishedStatus.nodeId() +
+                                    "] but local node id is [" + localNodeId + "]";
+                            clones.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
+                            iterator.remove();
+                        } else {
+                            // current entry is a snapshot operation so we must translate the repository shard id to a routing shard id
+                            final IndexMetadata indexMeta = currentState.metadata().index(finishedShardId.indexName());
+                            if (indexMeta == null) {
+                                // The index name that finished cloning does not exist in the cluster state so it isn't relevant to a
+                                // normal snapshot
+                                continue;
+                            }
+                            final ShardId finishedRoutingShardId = new ShardId(indexMeta.getIndex(), finishedShardId.shardId());
+                            final ShardSnapshotStatus existingStatus = entry.shards().get(finishedRoutingShardId);
+                            if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+                                continue;
+                            }
+                            if (shards == null) {
+                                shards = ImmutableOpenMap.builder(entry.shards());
+                            }
+                            final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+                            logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
+                                    finishedStatus.nodeId(), finishedStatus.generation());
+                            // A clone was updated, so we must use the correct data node id for the reassignment as actual shard
+                            // snapshot
+                            final ShardSnapshotStatus shardSnapshotStatus = startShardSnapshotAfterClone(currentState,
+                                    updateSnapshotState.updatedState.generation(), finishedRoutingShardId);
+                            shards.put(finishedRoutingShardId, shardSnapshotStatus);
+                            if (shardSnapshotStatus.isActive()) {
+                                // only remove the update from the list of tasks that might hold a reusable shard if we actually
+                                // started a snapshot and didn't just fail
+                                iterator.remove();
+                            }
+                        }
                     }
-                    if (shards == null) {
-                        shards = ImmutableOpenMap.builder(entry.shards());
+                } else {
+                    // a (non-clone) shard snapshot operation was updated
+                    final ShardId finishedShardId = updateSnapshotState.shardId;
+                    if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
+                        final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
+                        if (existing == null) {
+                            logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
+                                    updateSnapshotState, entry);
+                            assert false : "This should never happen, data nodes should only send updates for expected shards";
+                            continue;
+                        }
+                        if (existing.state().completed()) {
+                            // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
+                            iterator.remove();
+                            continue;
+                        }
+                        logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
+                                finishedShardId, updateSnapshotState.updatedState.state());
+                        if (shards == null) {
+                            shards = ImmutableOpenMap.builder(entry.shards());
+                        }
+                        shards.put(finishedShardId, updateSnapshotState.updatedState);
+                        executedTasks.add(updateSnapshotState);
+                        changedCount++;
+                    } else if (executedTasks.contains(updateSnapshotState)) {
+                        // We applied the update for a shard snapshot state to its snapshot entry, now check if we can update
+                        // either a clone or a snapshot
+                        if (entry.isClone()) {
+                            // Since we updated a normal snapshot we need to translate its shard ids to repository shard ids which requires
+                            // a lookup for the index ids
+                            if (indicesLookup == null) {
+                                indicesLookup = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
+                            }
+                            // shard snapshot was completed, we check if we can start a clone operation for the same repo shard
+                            final IndexId indexId = indicesLookup.get(finishedShardId.getIndexName());
+                            // If the lookup finds the index id then at least the entry is concerned with the index id just updated
+                            // so we check on a shard level
+                            if (indexId != null) {
+                                final RepositoryShardId repoShardId = new RepositoryShardId(indexId, finishedShardId.getId());
+                                final ShardSnapshotStatus existingStatus = entry.clones().get(repoShardId);
+                                if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+                                    continue;
+                                }
+                                if (clones == null) {
+                                    clones = ImmutableOpenMap.builder(entry.clones());
+                                }
+                                final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+                                logger.trace("Starting clone [{}] on [{}] with generation [{}]", finishedShardId,
+                                        finishedStatus.nodeId(), finishedStatus.generation());
+                                clones.put(repoShardId, new ShardSnapshotStatus(localNodeId, finishedStatus.generation()));
+                                iterator.remove();
+                                startedCount++;
+                            }
+                        } else {
+                            // shard snapshot was completed, we check if we can start another snapshot
+                            final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
+                            if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+                                continue;
+                            }
+                            if (shards == null) {
+                                shards = ImmutableOpenMap.builder(entry.shards());
+                            }
+                            final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+                            logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
+                                    finishedStatus.nodeId(), finishedStatus.generation());
+                            shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
+                            iterator.remove();
+                        }
                     }
-                    final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
-                    logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
-                            finishedStatus.nodeId(), finishedStatus.generation());
-                    shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
-                    iterator.remove();
-                    startedCount++;
                 }
             }
 
-            if (shards == null) {
-                entries.add(entry);
+            final SnapshotsInProgress.Entry updatedEntry;
+            if (shards != null) {
+                assert clones == null : "Should not have updated clones when updating shard snapshots but saw " + clones +
+                        " as well as " + shards;
+                updatedEntry = entry.withShardStates(shards.build());
+            } else if (clones != null) {
+                updatedEntry = entry.withClones(clones.build());
             } else {
-                entries.add(entry.withShardStates(shards.build()));
+                updatedEntry = entry;
             }
+            entries.add(updatedEntry);
         }
         if (changedCount > 0) {
             logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
                     "[{}] shard snapshots", changedCount, startedCount);
-            return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(
-                    ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build());
+            return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks)
+                    .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
+                            SnapshotsInProgress.of(entries)).build());
         }
         return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(currentState);
     };
 
+    /**
+     * Creates a {@link ShardSnapshotStatus} entry for a snapshot after the shard has become available for snapshotting as a result
+     * of a snapshot clone completing.
+     *
+     * @param currentState            current cluster state
+     * @param shardGeneration         shard generation of the shard in the repository
+     * @param shardId shard id of the shard that just finished cloning
+     * @return shard snapshot status
+     */
+    private static ShardSnapshotStatus startShardSnapshotAfterClone(ClusterState currentState, String shardGeneration, ShardId shardId) {
+        final ShardRouting primary = currentState.routingTable().index(shardId.getIndex()).shard(shardId.id()).primaryShard();
+        final ShardSnapshotStatus shardSnapshotStatus;
+        if (primary == null || !primary.assignedToNode()) {
+            shardSnapshotStatus = new ShardSnapshotStatus(
+                    null, ShardState.MISSING, "primary shard is not allocated", shardGeneration);
+        } else if (primary.relocating() || primary.initializing()) {
+            shardSnapshotStatus =
+                    new ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING, shardGeneration);
+        } else if (primary.started() == false) {
+            shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
+                    "primary shard hasn't been started yet", shardGeneration);
+        } else {
+            shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), shardGeneration);
+        }
+        return shardSnapshotStatus;
+    }
+
     /**
      * An update to the snapshot state of a shard.
+     *
+     * Package private for testing
      */
-    private static final class ShardSnapshotUpdate {
+    static final class ShardSnapshotUpdate {
 
         private final Snapshot snapshot;
 
         private final ShardId shardId;
 
+        private final RepositoryShardId repoShardId;
+
         private final ShardSnapshotStatus updatedState;
 
-        private ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) {
+        ShardSnapshotUpdate(Snapshot snapshot, RepositoryShardId repositoryShardId, ShardSnapshotStatus updatedState) {
+            this.snapshot = snapshot;
+            this.shardId = null;
+            this.updatedState = updatedState;
+            this.repoShardId = repositoryShardId;
+        }
+
+        ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) {
             this.snapshot = snapshot;
             this.shardId = shardId;
             this.updatedState = updatedState;
+            repoShardId = null;
+        }
+
+
+        public boolean isClone() {
+            return repoShardId != null;
         }
 
         @Override
@@ -2013,13 +2532,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 return false;
             }
             final ShardSnapshotUpdate that = (ShardSnapshotUpdate) other;
-            return this.snapshot.equals(that.snapshot) && this.shardId.equals(that.shardId) && this.updatedState == that.updatedState;
+            return this.snapshot.equals(that.snapshot) && Objects.equals(this.shardId, that.shardId)
+                    && Objects.equals(this.repoShardId, that.repoShardId) && this.updatedState == that.updatedState;
         }
 
 
         @Override
         public int hashCode() {
-            return Objects.hash(snapshot, shardId, updatedState);
+            return Objects.hash(snapshot, shardId, updatedState, repoShardId);
         }
     }
 
@@ -2048,19 +2568,35 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                         } finally {
                             // Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
                             // state update we check if its state is completed and end it if it is.
+                            final SnapshotsInProgress snapshotsInProgress =
+                                    newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
                             if (endingSnapshots.contains(update.snapshot) == false) {
-                                final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
                                 final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(update.snapshot);
                                 // If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
                                 if (updatedEntry != null && updatedEntry.state().completed()) {
                                     endSnapshot(updatedEntry, newState.metadata(), null);
                                 }
                             }
+                            startExecutableClones(snapshotsInProgress, update.snapshot.getRepository());
                         }
                     }
                 });
     }
 
+    private void startExecutableClones(SnapshotsInProgress snapshotsInProgress, @Nullable String repoName) {
+        for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
+            if (entry.isClone() && entry.state() == State.STARTED && (repoName == null || entry.repository().equals(repoName))) {
+                // this is a clone, see if new work is ready
+                for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> clone : entry.clones()) {
+                    if (clone.value.state() == ShardState.INIT) {
+                        runReadyClone(entry.snapshot(), entry.source(), clone.value, clone.key,
+                                repositoriesService.repository(entry.repository()));
+                    }
+                }
+            }
+        }
+    }
+
     private class UpdateSnapshotStatusAction
             extends TransportMasterNodeAction<UpdateIndexShardSnapshotStatusRequest, UpdateIndexShardSnapshotStatusResponse> {
         UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService,

+ 3 - 0
server/src/main/java/org/elasticsearch/snapshots/package-info.java

@@ -99,6 +99,9 @@
  * update to remove the deletion's entry in {@code SnapshotDeletionsInProgress} which concludes the process of deleting a snapshot.</li>
  * </ol>
  *
+ * <h2>Cloning a Snapshot</h2>
+ * TODO: write up the steps in a snapshot clone properly
+ *
  * <h2>Concurrent Snapshot Operations</h2>
  *
  * Snapshot create and delete operations may be started concurrently. Operations targeting different repositories run independently of

+ 427 - 0
server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java

@@ -0,0 +1,427 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.snapshots;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.SnapshotsInProgress;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.repositories.RepositoryShardId;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
+import static org.hamcrest.Matchers.is;
+
+public class SnapshotsServiceTests extends ESTestCase {
+
+    public void testNoopShardStateUpdates() throws Exception {
+        final String repoName = "test-repo";
+        final Snapshot snapshot = snapshot(repoName, "snapshot-1");
+        final SnapshotsInProgress.Entry snapshotNoShards = snapshotEntry(snapshot, Collections.emptyList(), ImmutableOpenMap.of());
+
+        final String indexName1 = "index-1";
+        final ShardId shardId1 = new ShardId(index(indexName1), 0);
+        {
+            final ClusterState state = stateWithSnapshots(snapshotNoShards);
+            final SnapshotsService.ShardSnapshotUpdate shardCompletion =
+                    new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId1, successfulShardStatus(uuid()));
+            assertIsNoop(state, shardCompletion);
+        }
+        {
+            final ClusterState state = stateWithSnapshots(
+                    snapshotEntry(
+                            snapshot, Collections.singletonList(indexId(indexName1)), shardsMap(shardId1, initShardStatus(uuid()))));
+            final SnapshotsService.ShardSnapshotUpdate shardCompletion = new SnapshotsService.ShardSnapshotUpdate(
+                    snapshot("other-repo", snapshot.getSnapshotId().getName()), shardId1, successfulShardStatus(uuid()));
+            assertIsNoop(state, shardCompletion);
+        }
+    }
+
+    public void testUpdateSnapshotToSuccess() throws Exception {
+        final String repoName = "test-repo";
+        final Snapshot sn1 = snapshot(repoName, "snapshot-1");
+        final String indexName1 = "index-1";
+        final String dataNodeId = uuid();
+        final IndexId indexId1 = indexId(indexName1);
+        final ShardId shardId1 = new ShardId(index(indexName1), 0);
+        final SnapshotsInProgress.Entry snapshotSingleShard =
+                snapshotEntry(sn1, Collections.singletonList(indexId1), shardsMap(shardId1, initShardStatus(dataNodeId)));
+
+        assertThat(snapshotSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+        final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(sn1, shardId1, dataNodeId);
+        final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard), completeShard);
+        final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+        final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
+        assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.SUCCESS));
+        assertIsNoop(updatedClusterState, completeShard);
+    }
+
+    public void testUpdateSnapshotMultipleShards() throws Exception {
+        final String repoName = "test-repo";
+        final Snapshot sn1 = snapshot(repoName, "snapshot-1");
+        final String indexName1 = "index-1";
+        final String dataNodeId = uuid();
+        final IndexId indexId1 = indexId(indexName1);
+        final Index routingIndex1 = index(indexName1);
+        final ShardId shardId1 = new ShardId(routingIndex1, 0);
+        final ShardId shardId2 = new ShardId(routingIndex1, 1);
+        final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId);
+        final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(sn1, Collections.singletonList(indexId1),
+                ImmutableOpenMap.builder(shardsMap(shardId1, shardInitStatus)).fPut(shardId2, shardInitStatus).build());
+
+        assertThat(snapshotSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+        final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(sn1, shardId1, dataNodeId);
+        final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard), completeShard);
+        final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+        final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
+        assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.STARTED));
+        assertIsNoop(updatedClusterState, completeShard);
+    }
+
+    public void testUpdateCloneToSuccess() throws Exception {
+        final String repoName = "test-repo";
+        final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+        final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+        final String indexName1 = "index-1";
+        final String dataNodeId = uuid();
+        final IndexId indexId1 = indexId(indexName1);
+        final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
+        final SnapshotsInProgress.Entry cloneSingleShard =
+                cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(), clonesMap(shardId1, initShardStatus(dataNodeId)));
+
+        assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+        final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(targetSnapshot, shardId1, dataNodeId);
+        final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(cloneSingleShard), completeShard);
+        final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+        final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
+        assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.SUCCESS));
+        assertIsNoop(updatedClusterState, completeShard);
+    }
+
+    public void testUpdateCloneMultipleShards() throws Exception {
+        final String repoName = "test-repo";
+        final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+        final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+        final String indexName1 = "index-1";
+        final String dataNodeId = uuid();
+        final IndexId indexId1 = indexId(indexName1);
+        final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
+        final RepositoryShardId shardId2 = new RepositoryShardId(indexId1, 1);
+        final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId);
+        final SnapshotsInProgress.Entry cloneMultipleShards = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
+                ImmutableOpenMap.builder(clonesMap(shardId1, shardInitStatus)).fPut(shardId2, shardInitStatus).build());
+
+        assertThat(cloneMultipleShards.state(), is(SnapshotsInProgress.State.STARTED));
+
+        final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(targetSnapshot, shardId1, dataNodeId);
+        final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(cloneMultipleShards), completeShard);
+        final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+        final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
+        assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.STARTED));
+        assertIsNoop(updatedClusterState, completeShard);
+    }
+
+    public void testCompletedCloneStartsSnapshot() throws Exception {
+        final String repoName = "test-repo";
+        final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+        final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+        final String indexName1 = "index-1";
+        final String dataNodeId = uuid();
+        final IndexId indexId1 = indexId(indexName1);
+        final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
+        final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId);
+        final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
+                clonesMap(shardId1, shardInitStatus));
+
+        final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName1);
+        final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot");
+        final ShardId routingShardId1 = new ShardId(stateWithIndex.metadata().index(indexName1).getIndex(), 0);
+        final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1),
+                shardsMap(routingShardId1, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
+
+        assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+        // 1. case: shard that just finished cloning is unassigned -> shard snapshot should go to MISSING state
+        final ClusterState stateWithUnassignedRoutingShard = stateWithSnapshots(stateWithIndex, cloneSingleShard, snapshotSingleShard);
+        final SnapshotsService.ShardSnapshotUpdate completeShardClone = successUpdate(targetSnapshot, shardId1, uuid());
+        {
+            final ClusterState updatedClusterState = applyUpdates(stateWithUnassignedRoutingShard, completeShardClone);
+            final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+            final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+            assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+            final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+            assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.SUCCESS));
+            assertThat(startedSnapshot.shards().get(routingShardId1).state(), is(SnapshotsInProgress.ShardState.MISSING));
+            assertIsNoop(updatedClusterState, completeShardClone);
+        }
+
+        // 2. case: shard that just finished cloning is assigned correctly -> shard snapshot should go to INIT state
+        final ClusterState stateWithAssignedRoutingShard =
+                ClusterState.builder(stateWithUnassignedRoutingShard).routingTable(
+                        RoutingTable.builder(stateWithUnassignedRoutingShard.routingTable()).add(
+                                IndexRoutingTable.builder(routingShardId1.getIndex()).addIndexShard(
+                                        new IndexShardRoutingTable.Builder(routingShardId1).addShard(
+                                                TestShardRouting.newShardRouting(
+                                                        routingShardId1, dataNodeId, true, ShardRoutingState.STARTED)
+                                        ).build())).build()).build();
+        {
+            final ClusterState updatedClusterState = applyUpdates(stateWithAssignedRoutingShard, completeShardClone);
+            final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+            final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+            assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+            final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+            assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+            final SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = startedSnapshot.shards().get(routingShardId1);
+            assertThat(shardSnapshotStatus.state(), is(SnapshotsInProgress.ShardState.INIT));
+            assertThat(shardSnapshotStatus.nodeId(), is(dataNodeId));
+            assertIsNoop(updatedClusterState, completeShardClone);
+        }
+
+        // 3. case: shard that just finished cloning is currently initializing -> shard snapshot should go to WAITING state
+        final ClusterState stateWithInitializingRoutingShard =
+                ClusterState.builder(stateWithUnassignedRoutingShard).routingTable(
+                        RoutingTable.builder(stateWithUnassignedRoutingShard.routingTable()).add(
+                                IndexRoutingTable.builder(routingShardId1.getIndex()).addIndexShard(
+                                        new IndexShardRoutingTable.Builder(routingShardId1).addShard(
+                                                TestShardRouting.newShardRouting(
+                                                        routingShardId1, dataNodeId, true, ShardRoutingState.INITIALIZING)
+                                        ).build())).build()).build();
+        {
+            final ClusterState updatedClusterState = applyUpdates(stateWithInitializingRoutingShard, completeShardClone);
+            final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+            final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+            assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+            final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+            assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+            assertThat(startedSnapshot.shards().get(routingShardId1).state(), is(SnapshotsInProgress.ShardState.WAITING));
+            assertIsNoop(updatedClusterState, completeShardClone);
+        }
+    }
+
+    public void testCompletedSnapshotStartsClone() throws Exception {
+        final String repoName = "test-repo";
+        final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+        final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+        final String indexName = "index-1";
+        final String dataNodeId = uuid();
+        final IndexId indexId1 = indexId(indexName);
+        final RepositoryShardId repositoryShardId = new RepositoryShardId(indexId1, 0);
+        final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
+                clonesMap(repositoryShardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
+
+        final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName);
+        final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot");
+        final ShardId routingShardId = new ShardId(stateWithIndex.metadata().index(indexName).getIndex(), 0);
+        final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1),
+                shardsMap(routingShardId, initShardStatus(dataNodeId)));
+
+        assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+        final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(plainSnapshot, routingShardId, dataNodeId);
+
+        final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard, cloneSingleShard), completeShard);
+        final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+        final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+        assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+        final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+        assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+        final SnapshotsInProgress.ShardSnapshotStatus shardCloneStatus = startedSnapshot.clones().get(repositoryShardId);
+        assertThat(shardCloneStatus.state(), is(SnapshotsInProgress.ShardState.INIT));
+        assertThat(shardCloneStatus.nodeId(), is(updatedClusterState.nodes().getLocalNodeId()));
+        assertIsNoop(updatedClusterState, completeShard);
+    }
+
+    public void testCompletedSnapshotStartsNextSnapshot() throws Exception {
+        final String repoName = "test-repo";
+        final String indexName = "index-1";
+        final String dataNodeId = uuid();
+        final IndexId indexId1 = indexId(indexName);
+
+        final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName);
+        final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot-1");
+        final ShardId routingShardId = new ShardId(stateWithIndex.metadata().index(indexName).getIndex(), 0);
+        final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1),
+                shardsMap(routingShardId, initShardStatus(dataNodeId)));
+
+        final Snapshot queuedSnapshot = snapshot(repoName, "test-snapshot-2");
+        final SnapshotsInProgress.Entry queuedSnapshotSingleShard = snapshotEntry(queuedSnapshot, Collections.singletonList(indexId1),
+                shardsMap(routingShardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
+
+        final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(plainSnapshot, routingShardId, dataNodeId);
+
+        final ClusterState updatedClusterState =
+                applyUpdates(stateWithSnapshots(snapshotSingleShard, queuedSnapshotSingleShard), completeShard);
+        final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+        final SnapshotsInProgress.Entry completedSnapshot = snapshotsInProgress.entries().get(0);
+        assertThat(completedSnapshot.state(), is(SnapshotsInProgress.State.SUCCESS));
+        final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+        assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+        final SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = startedSnapshot.shards().get(routingShardId);
+        assertThat(shardSnapshotStatus.state(), is(SnapshotsInProgress.ShardState.INIT));
+        assertThat(shardSnapshotStatus.nodeId(), is(dataNodeId));
+        assertIsNoop(updatedClusterState, completeShard);
+    }
+
+    public void testCompletedCloneStartsNextClone() throws Exception {
+        final String repoName = "test-repo";
+        final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+        final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+        final String indexName1 = "index-1";
+        final IndexId indexId1 = indexId(indexName1);
+        final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
+        final String masterNodeId = uuid();
+        final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
+                clonesMap(shardId1, initShardStatus(masterNodeId)));
+
+        final Snapshot queuedTargetSnapshot = snapshot(repoName, "test-snapshot");
+        final SnapshotsInProgress.Entry queuedClone = cloneEntry(queuedTargetSnapshot, sourceSnapshot.getSnapshotId(),
+                clonesMap(shardId1, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
+
+        assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+        final ClusterState stateWithUnassignedRoutingShard = stateWithSnapshots(
+                ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes(masterNodeId)).build(), cloneSingleShard, queuedClone);
+        final SnapshotsService.ShardSnapshotUpdate completeShardClone = successUpdate(targetSnapshot, shardId1, masterNodeId);
+
+        final ClusterState updatedClusterState = applyUpdates(stateWithUnassignedRoutingShard, completeShardClone);
+        final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+        final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+        assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+        final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+        assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+        assertThat(startedSnapshot.clones().get(shardId1).state(), is(SnapshotsInProgress.ShardState.INIT));
+        assertIsNoop(updatedClusterState, completeShardClone);
+    }
+
+    private static DiscoveryNodes discoveryNodes(String localNodeId) {
+        return DiscoveryNodes.builder().localNodeId(localNodeId).build();
+    }
+
+    private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shardsMap(
+            ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus shardStatus) {
+        return ImmutableOpenMap.<ShardId, SnapshotsInProgress.ShardSnapshotStatus>builder().fPut(shardId, shardStatus).build();
+    }
+
+    private static ImmutableOpenMap<RepositoryShardId, SnapshotsInProgress.ShardSnapshotStatus> clonesMap(
+            RepositoryShardId shardId, SnapshotsInProgress.ShardSnapshotStatus shardStatus) {
+        return ImmutableOpenMap.<RepositoryShardId, SnapshotsInProgress.ShardSnapshotStatus>builder().fPut(shardId, shardStatus).build();
+    }
+
+    private static SnapshotsService.ShardSnapshotUpdate successUpdate(Snapshot snapshot, ShardId shardId, String nodeId) {
+        return new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId, successfulShardStatus(nodeId));
+    }
+
+    private static SnapshotsService.ShardSnapshotUpdate successUpdate(Snapshot snapshot, RepositoryShardId shardId, String nodeId) {
+        return new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId, successfulShardStatus(nodeId));
+    }
+
+    private static ClusterState stateWithUnassignedIndices(String... indexNames) {
+        final Metadata.Builder metaBuilder = Metadata.builder(Metadata.EMPTY_METADATA);
+        for (String index : indexNames) {
+            metaBuilder.put(IndexMetadata.builder(index)
+                    .settings(Settings.builder().put(SETTING_VERSION_CREATED, Version.CURRENT.id))
+                    .numberOfShards(1).numberOfReplicas(0)
+                    .build(), false);
+        }
+        final RoutingTable.Builder routingTable = RoutingTable.builder();
+        for (String index : indexNames) {
+            final Index idx = metaBuilder.get(index).getIndex();
+            routingTable.add(IndexRoutingTable.builder(idx).addIndexShard(
+                    new IndexShardRoutingTable.Builder(new ShardId(idx, 0)).build()));
+        }
+        return ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metaBuilder).routingTable(routingTable.build()).build();
+    }
+
+    private static ClusterState stateWithSnapshots(ClusterState state, SnapshotsInProgress.Entry... entries) {
+        return ClusterState.builder(state).version(state.version() + 1L)
+                .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Arrays.asList(entries))).build();
+    }
+
+    private static ClusterState stateWithSnapshots(SnapshotsInProgress.Entry... entries) {
+        return stateWithSnapshots(ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes(uuid())).build(), entries);
+    }
+
+    private static void assertIsNoop(ClusterState state, SnapshotsService.ShardSnapshotUpdate shardCompletion) throws Exception {
+        assertSame(applyUpdates(state, shardCompletion), state);
+    }
+
+    private static ClusterState applyUpdates(ClusterState state, SnapshotsService.ShardSnapshotUpdate... updates) throws Exception {
+        return SnapshotsService.SHARD_STATE_EXECUTOR.execute(state, Arrays.asList(updates)).resultingState;
+    }
+
+    private static SnapshotsInProgress.Entry snapshotEntry(Snapshot snapshot, List<IndexId> indexIds,
+                                                           ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards) {
+        return SnapshotsInProgress.startedEntry(snapshot, randomBoolean(), randomBoolean(), indexIds, Collections.emptyList(),
+                1L, randomNonNegativeLong(), shards, Collections.emptyMap(), Version.CURRENT);
+    }
+
+    private static SnapshotsInProgress.Entry cloneEntry(
+            Snapshot snapshot, SnapshotId source, ImmutableOpenMap<RepositoryShardId, SnapshotsInProgress.ShardSnapshotStatus> clones) {
+        final List<IndexId> indexIds = StreamSupport.stream(clones.keys().spliterator(), false)
+                .map(k -> k.value.index()).distinct().collect(Collectors.toList());
+        return SnapshotsInProgress.startClone(snapshot, source, indexIds, 1L, randomNonNegativeLong(), Version.CURRENT).withClones(clones);
+    }
+
+    private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) {
+        return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, uuid());
+    }
+
+    private static SnapshotsInProgress.ShardSnapshotStatus successfulShardStatus(String nodeId) {
+        return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, SnapshotsInProgress.ShardState.SUCCESS, uuid());
+    }
+
+    private static Snapshot snapshot(String repoName, String name) {
+        return new Snapshot(repoName, new SnapshotId(name, uuid()));
+    }
+
+    private static Index index(String name) {
+        return new Index(name, uuid());
+    }
+
+    private static IndexId indexId(String name) {
+        return new IndexId(name, uuid());
+    }
+
+    private static String uuid() {
+        return UUIDs.randomBase64UUID(random());
+    }
+}

+ 5 - 0
test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -90,6 +90,11 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
 
     private static final String OLD_VERSION_SNAPSHOT_PREFIX = "old-version-snapshot-";
 
+    // Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough
+    // threads so that blocking some threads on one repository doesn't block other repositories from doing work
+    protected static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder()
+            .put("thread_pool.snapshot.core", 5).put("thread_pool.snapshot.max", 5).build();
+
     @Override
     protected Settings nodeSettings(int nodeOrdinal) {
         return Settings.builder().put(super.nodeSettings(nodeOrdinal))

+ 25 - 3
test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

@@ -118,6 +118,10 @@ public class MockRepository extends FsRepository {
     /** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */
     private volatile boolean blockAndFailOnWriteSnapFile;
 
+    private volatile boolean blockOnWriteShardLevelMeta;
+
+    private volatile boolean blockOnReadIndexMeta;
+
     /**
      * Writes to the blob {@code index.latest} at the repository root will fail with an {@link IOException} if {@code true}.
      */
@@ -189,6 +193,8 @@ public class MockRepository extends FsRepository {
         blockOnWriteIndexFile = false;
         blockAndFailOnWriteSnapFile = false;
         blockOnDeleteIndexN = false;
+        blockOnWriteShardLevelMeta = false;
+        blockOnReadIndexMeta = false;
         this.notifyAll();
     }
 
@@ -212,6 +218,14 @@ public class MockRepository extends FsRepository {
         blockOnDeleteIndexN = true;
     }
 
+    public void setBlockOnWriteShardLevelMeta() {
+        blockOnWriteShardLevelMeta = true;
+    }
+
+    public void setBlockOnReadIndexMeta() {
+        blockOnReadIndexMeta = true;
+    }
+
     public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) {
         this.failReadsAfterUnblock = failReadsAfterUnblock;
     }
@@ -229,7 +243,7 @@ public class MockRepository extends FsRepository {
         boolean wasBlocked = false;
         try {
             while (blockOnDataFiles || blockOnAnyFiles || blockOnWriteIndexFile ||
-                blockAndFailOnWriteSnapFile || blockOnDeleteIndexN) {
+                blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta) {
                 blocked = true;
                 this.wait();
                 wasBlocked = true;
@@ -365,8 +379,12 @@ public class MockRepository extends FsRepository {
 
             @Override
             public InputStream readBlob(String name) throws IOException {
-                maybeReadErrorAfterBlock(name);
-                maybeIOExceptionOrBlock(name);
+                if (blockOnReadIndexMeta && name.startsWith(BlobStoreRepository.METADATA_PREFIX) &&  path().equals(basePath()) == false) {
+                    blockExecutionAndMaybeWait(name);
+                } else {
+                    maybeReadErrorAfterBlock(name);
+                    maybeIOExceptionOrBlock(name);
+                }
                 return super.readBlob(name);
             }
 
@@ -430,6 +448,10 @@ public class MockRepository extends FsRepository {
             public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
                 throws IOException {
                 maybeIOExceptionOrBlock(blobName);
+                if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)
+                        && path().equals(basePath()) == false) {
+                    blockExecutionAndMaybeWait(blobName);
+                }
                 super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
                 if (RandomizedContext.current().getRandom().nextBoolean()) {
                     // for network based repositories, the blob may have been written but we may still