Browse Source

Make Repository.getRepositoryData an Async API (#49299)

This API call in most implementations is fairly IO heavy and slow
so it is more natural to be async in the first place.
Concretely though, this change is a prerequisite of #49060 since
determining the repository generation from the cluster state
introduces situations where this call would have to wait for other
operations to finish. Doing so in a blocking manner would break
`SnapshotResiliencyTests` and waste a thread.
Also, this sets up the possibility to in the future make use of async IO
where provided by the underlying Repository implementation.

In a follow-up `SnapshotsService#getRepositoryData` will be made async
as well (did not do it here, since it's another huge change to do so).
Note: This change for now does not alter the threading behaviour in any way (since `Repository#getRepositoryData` isn't forking) and is purely mechanical.
Armin Braun 6 years ago
parent
commit
4d659c4bdb
18 changed files with 706 additions and 661 deletions
  1. 90 82
      server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java
  2. 11 5
      server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
  3. 2 2
      server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
  4. 1 1
      server/src/main/java/org/elasticsearch/repositories/Repository.java
  5. 33 29
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
  6. 303 297
      server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
  7. 189 180
      server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
  8. 2 2
      server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
  9. 11 11
      server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
  10. 5 10
      server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
  11. 2 2
      server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
  12. 17 7
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  13. 1 1
      server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java
  14. 3 1
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  15. 3 3
      test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
  16. 9 7
      test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java
  17. 22 20
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
  18. 2 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

+ 90 - 82
server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

@@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
+import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.cluster.ClusterState;
@@ -41,6 +42,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryCleanupResult;
+import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 import org.elasticsearch.snapshots.SnapshotsService;
 import org.elasticsearch.tasks.Task;
@@ -168,97 +170,103 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
             return;
         }
         final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
-        final long repositoryStateId = repository.getRepositoryData().getGenId();
-        logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
-        clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']',
-            new ClusterStateUpdateTask() {
+        final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
+        repository.getRepositoryData(repositoryDataListener);
+        repositoryDataListener.whenComplete(repositoryData -> {
+            final long repositoryStateId = repositoryData.getGenId();
+            logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
+            clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']',
+                new ClusterStateUpdateTask() {
 
-                private boolean startedCleanup = false;
+                    private boolean startedCleanup = false;
 
-                @Override
-                public ClusterState execute(ClusterState currentState) {
-                    final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
-                    if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
-                        throw new IllegalStateException(
-                            "Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in ["
-                                + repositoryCleanupInProgress + "]");
-                    }
-                    SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
-                    if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
-                        throw new IllegalStateException("Cannot cleanup [" + repositoryName
-                            + "] - a snapshot is currently being deleted in [" + deletionsInProgress + "]");
-                    }
-                    SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
-                    if (snapshots != null && !snapshots.entries().isEmpty()) {
-                        throw new IllegalStateException(
-                            "Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]");
+                    @Override
+                    public ClusterState execute(ClusterState currentState) {
+                        final RepositoryCleanupInProgress repositoryCleanupInProgress =
+                            currentState.custom(RepositoryCleanupInProgress.TYPE);
+                        if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
+                            throw new IllegalStateException(
+                                "Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in ["
+                                    + repositoryCleanupInProgress + "]");
+                        }
+                        SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
+                        if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
+                            throw new IllegalStateException("Cannot cleanup [" + repositoryName
+                                + "] - a snapshot is currently being deleted in [" + deletionsInProgress + "]");
+                        }
+                        SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
+                        if (snapshots != null && !snapshots.entries().isEmpty()) {
+                            throw new IllegalStateException(
+                                "Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]");
+                        }
+                        return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE,
+                            new RepositoryCleanupInProgress(
+                                RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build();
                     }
-                    return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE,
-                        new RepositoryCleanupInProgress(
-                            RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build();
-                }
-
-                @Override
-                public void onFailure(String source, Exception e) {
-                    after(e, null);
-                }
 
-                @Override
-                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                    startedCleanup = true;
-                    logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
-                    threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
-                        l -> blobStoreRepository.cleanup(
-                            repositoryStateId,
-                            newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
-                            ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
-                }
-
-                private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
-                    if (failure == null) {
-                        logger.debug("Finished repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId);
-                    } else {
-                        logger.debug(() -> new ParameterizedMessage(
-                            "Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure);
+                    @Override
+                    public void onFailure(String source, Exception e) {
+                        after(e, null);
                     }
-                    assert failure != null || result != null;
-                    if (startedCleanup == false) {
-                        logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure);
-                        listener.onFailure(failure);
-                        return;
+
+                    @Override
+                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                        startedCleanup = true;
+                        logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
+                        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
+                            l -> blobStoreRepository.cleanup(
+                                repositoryStateId,
+                                newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
+                                ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
                     }
-                    clusterService.submitStateUpdateTask(
-                        "remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']',
-                        new ClusterStateUpdateTask() {
-                            @Override
-                            public ClusterState execute(ClusterState currentState) {
-                                return removeInProgressCleanup(currentState);
-                            }
 
-                            @Override
-                            public void onFailure(String source, Exception e) {
-                                if (failure != null) {
-                                    e.addSuppressed(failure);
+                    private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
+                        if (failure == null) {
+                            logger.debug("Finished repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId);
+                        } else {
+                            logger.debug(() -> new ParameterizedMessage(
+                                "Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure);
+                        }
+                        assert failure != null || result != null;
+                        if (startedCleanup == false) {
+                            logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure);
+                            listener.onFailure(failure);
+                            return;
+                        }
+                        clusterService.submitStateUpdateTask(
+                            "remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']',
+                            new ClusterStateUpdateTask() {
+                                @Override
+                                public ClusterState execute(ClusterState currentState) {
+                                    return removeInProgressCleanup(currentState);
+                                }
+
+                                @Override
+                                public void onFailure(String source, Exception e) {
+                                    if (failure != null) {
+                                        e.addSuppressed(failure);
+                                    }
+                                    logger.warn(() ->
+                                        new ParameterizedMessage("[{}] failed to remove repository cleanup task", repositoryName), e);
+                                    listener.onFailure(e);
                                 }
-                                logger.warn(() ->
-                                    new ParameterizedMessage("[{}] failed to remove repository cleanup task", repositoryName), e);
-                                listener.onFailure(e);
-                            }
 
-                            @Override
-                            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                                if (failure == null) {
-                                    logger.info("Done with repository cleanup on [{}][{}] with result [{}]",
-                                        repositoryName, repositoryStateId, result);
-                                    listener.onResponse(result);
-                                } else {
-                                    logger.warn(() -> new ParameterizedMessage("Failed to run repository cleanup operations on [{}][{}]",
-                                        repositoryName, repositoryStateId), failure);
-                                    listener.onFailure(failure);
+                                @Override
+                                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                                    if (failure == null) {
+                                        logger.info("Done with repository cleanup on [{}][{}] with result [{}]",
+                                            repositoryName, repositoryStateId, result);
+                                        listener.onResponse(result);
+                                    } else {
+                                        logger.warn(() -> new ParameterizedMessage(
+                                            "Failed to run repository cleanup operations on [{}][{}]",
+                                            repositoryName, repositoryStateId), failure);
+                                        listener.onFailure(failure);
+                                    }
                                 }
-                            }
-                        });
-                }
-            });
+                            });
+                    }
+                });
+        }, listener::onFailure);
     }
 }

+ 11 - 5
server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

@@ -472,15 +472,21 @@ final class StoreRecovery {
             translogState.totalOperations(0);
             translogState.totalOperationsOnStart(0);
             indexShard.prepareForIndexRecovery();
-            ShardId snapshotShardId = shardId;
+            final ShardId snapshotShardId;
             final String indexName = restoreSource.index();
             if (!shardId.getIndexName().equals(indexName)) {
                 snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
+            } else {
+                snapshotShardId = shardId;
             }
-            final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
-            assert indexShard.getEngineOrNull() == null;
-            repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId,
-                indexShard.recoveryState(), restoreListener);
+            repository.getRepositoryData(ActionListener.wrap(
+                repositoryData -> {
+                    final IndexId indexId = repositoryData.resolveIndexId(indexName);
+                    assert indexShard.getEngineOrNull() == null;
+                    repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId,
+                        indexShard.recoveryState(), restoreListener);
+                }, restoreListener::onFailure
+            ));
         } catch (Exception e) {
             restoreListener.onFailure(e);
         }

+ 2 - 2
server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

@@ -68,8 +68,8 @@ public class FilterRepository implements Repository {
     }
 
     @Override
-    public RepositoryData getRepositoryData() {
-        return in.getRepositoryData();
+    public void getRepositoryData(ActionListener<RepositoryData> listener) {
+        in.getRepositoryData(listener);
     }
 
     @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/repositories/Repository.java

@@ -105,7 +105,7 @@ public interface Repository extends LifecycleComponent {
      * and the indices across all snapshots found in the repository.  Throws a {@link RepositoryException}
      * if there was an error in reading the data.
      */
-    RepositoryData getRepositoryData();
+    void getRepositoryData(ActionListener<RepositoryData> listener);
 
     /**
      * Finalizes snapshotting process

+ 33 - 29
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -116,6 +116,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -752,11 +753,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION
         // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened
         // when writing the index-${N} to each shard directory.
+        final Consumer<Exception> onUpdateFailure =
+            e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e));
         final ActionListener<SnapshotInfo> allMetaListener = new GroupedActionListener<>(
             ActionListener.wrap(snapshotInfos -> {
-                    assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
-                    final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
-                    final RepositoryData existingRepositoryData = getRepositoryData();
+                assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
+                final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
+                getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
                     final RepositoryData updatedRepositoryData =
                         existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations);
                     writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens);
@@ -764,9 +767,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                         cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
                     }
                     listener.onResponse(snapshotInfo);
-                },
-                e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e))),
-            2 + indices.size());
+                }, onUpdateFailure));
+            }, onUpdateFailure), 2 + indices.size());
         final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
 
         // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will
@@ -931,31 +933,33 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);
 
     @Override
-    public RepositoryData getRepositoryData() {
-        // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
-        while (true) {
-            final long generation;
-            try {
-                generation = latestIndexBlobId();
-            } catch (IOException ioe) {
-                throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
-            }
-            final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
-            if (genToLoad > generation) {
-                logger.info("Determined repository generation [" + generation
-                    + "] from repository contents but correct generation must be at least [" + genToLoad + "]");
-            }
-            try {
-                return getRepositoryData(genToLoad);
-            } catch (RepositoryException e) {
-                if (genToLoad != latestKnownRepoGen.get()) {
-                    logger.warn("Failed to load repository data generation [" + genToLoad +
-                        "] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
-                    continue;
+    public void getRepositoryData(ActionListener<RepositoryData> listener) {
+        ActionListener.completeWith(listener, () -> {
+            // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
+            while (true) {
+                final long generation;
+                try {
+                    generation = latestIndexBlobId();
+                } catch (IOException ioe) {
+                    throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
+                }
+                final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
+                if (genToLoad > generation) {
+                    logger.info("Determined repository generation [" + generation
+                        + "] from repository contents but correct generation must be at least [" + genToLoad + "]");
+                }
+                try {
+                    return getRepositoryData(genToLoad);
+                } catch (RepositoryException e) {
+                    if (genToLoad != latestKnownRepoGen.get()) {
+                        logger.warn("Failed to load repository data generation [" + genToLoad +
+                            "] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
+                        continue;
+                    }
+                    throw e;
                 }
-                throw e;
             }
-        }
+        });
     }
 
     private RepositoryData getRepositoryData(long indexGen) {

+ 303 - 297
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
@@ -176,359 +177,364 @@ public class RestoreService implements ClusterStateApplier {
             // Read snapshot info and metadata from the repository
             final String repositoryName = request.repository();
             Repository repository = repositoriesService.repository(repositoryName);
-            final RepositoryData repositoryData = repository.getRepositoryData();
-            final String snapshotName = request.snapshot();
-            final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
-                .filter(s -> snapshotName.equals(s.getName())).findFirst();
-            if (matchingSnapshotId.isPresent() == false) {
-                throw new SnapshotRestoreException(repositoryName, snapshotName, "snapshot does not exist");
-            }
+            final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
+            repository.getRepositoryData(repositoryDataListener);
+            repositoryDataListener.whenComplete(repositoryData -> {
+                final String snapshotName = request.snapshot();
+                final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
+                    .filter(s -> snapshotName.equals(s.getName())).findFirst();
+                if (matchingSnapshotId.isPresent() == false) {
+                    throw new SnapshotRestoreException(repositoryName, snapshotName, "snapshot does not exist");
+                }
 
-            final SnapshotId snapshotId = matchingSnapshotId.get();
-            final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
-            final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
+                final SnapshotId snapshotId = matchingSnapshotId.get();
+                final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
+                final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
 
-            // Make sure that we can restore from this snapshot
-            validateSnapshotRestorable(repositoryName, snapshotInfo);
+                // Make sure that we can restore from this snapshot
+                validateSnapshotRestorable(repositoryName, snapshotInfo);
 
-            // Resolve the indices from the snapshot that need to be restored
-            final List<String> indicesInSnapshot = filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
+                // Resolve the indices from the snapshot that need to be restored
+                final List<String> indicesInSnapshot = filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
 
-            final MetaData.Builder metaDataBuilder;
-            if (request.includeGlobalState()) {
-                metaDataBuilder = MetaData.builder(repository.getSnapshotGlobalMetaData(snapshotId));
-            } else {
-                metaDataBuilder = MetaData.builder();
-            }
+                final MetaData.Builder metaDataBuilder;
+                if (request.includeGlobalState()) {
+                    metaDataBuilder = MetaData.builder(repository.getSnapshotGlobalMetaData(snapshotId));
+                } else {
+                    metaDataBuilder = MetaData.builder();
+                }
 
-            final List<IndexId> indexIdsInSnapshot = repositoryData.resolveIndices(indicesInSnapshot);
-            for (IndexId indexId : indexIdsInSnapshot) {
-                metaDataBuilder.put(repository.getSnapshotIndexMetaData(snapshotId, indexId), false);
-            }
+                final List<IndexId> indexIdsInSnapshot = repositoryData.resolveIndices(indicesInSnapshot);
+                for (IndexId indexId : indexIdsInSnapshot) {
+                    metaDataBuilder.put(repository.getSnapshotIndexMetaData(snapshotId, indexId), false);
+                }
 
-            final MetaData metaData = metaDataBuilder.build();
-
-            // Apply renaming on index names, returning a map of names where
-            // the key is the renamed index and the value is the original name
-            final Map<String, String> indices = renamedIndices(request, indicesInSnapshot);
-
-            // Now we can start the actual restore process by adding shards to be recovered in the cluster state
-            // and updating cluster metadata (global and index) as needed
-            clusterService.submitStateUpdateTask("restore_snapshot[" + snapshotName + ']', new ClusterStateUpdateTask() {
-                final String restoreUUID = UUIDs.randomBase64UUID();
-                RestoreInfo restoreInfo = null;
-
-                @Override
-                public ClusterState execute(ClusterState currentState) {
-                    RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
-                    // Check if the snapshot to restore is currently being deleted
-                    SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
-                    if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
-                        throw new ConcurrentSnapshotExecutionException(snapshot,
-                            "cannot restore a snapshot while a snapshot deletion is in-progress [" +
-                                deletionsInProgress.getEntries().get(0).getSnapshot() + "]");
-                    }
+                final MetaData metaData = metaDataBuilder.build();
+
+                // Apply renaming on index names, returning a map of names where
+                // the key is the renamed index and the value is the original name
+                final Map<String, String> indices = renamedIndices(request, indicesInSnapshot);
+
+                // Now we can start the actual restore process by adding shards to be recovered in the cluster state
+                // and updating cluster metadata (global and index) as needed
+                clusterService.submitStateUpdateTask("restore_snapshot[" + snapshotName + ']', new ClusterStateUpdateTask() {
+                    final String restoreUUID = UUIDs.randomBase64UUID();
+                    RestoreInfo restoreInfo = null;
+
+                    @Override
+                    public ClusterState execute(ClusterState currentState) {
+                        RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
+                        // Check if the snapshot to restore is currently being deleted
+                        SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
+                        if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
+                            throw new ConcurrentSnapshotExecutionException(snapshot,
+                                "cannot restore a snapshot while a snapshot deletion is in-progress [" +
+                                    deletionsInProgress.getEntries().get(0).getSnapshot() + "]");
+                        }
 
-                    // Updating cluster state
-                    ClusterState.Builder builder = ClusterState.builder(currentState);
-                    MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
-                    ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
-                    RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
-                    ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards;
-                    Set<String> aliases = new HashSet<>();
-
-                    if (indices.isEmpty() == false) {
-                        // We have some indices to restore
-                        ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder();
-                        final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
-                            .minimumIndexCompatibilityVersion();
-                        for (Map.Entry<String, String> indexEntry : indices.entrySet()) {
-                            String index = indexEntry.getValue();
-                            boolean partial = checkPartial(index);
-                            SnapshotRecoverySource recoverySource =
-                                new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index);
-                            String renamedIndexName = indexEntry.getKey();
-                            IndexMetaData snapshotIndexMetaData = metaData.index(index);
-                            snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData,
-                                                                        request.indexSettings(), request.ignoreIndexSettings());
-                            try {
-                                snapshotIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(snapshotIndexMetaData,
-                                    minIndexCompatibilityVersion);
-                            } catch (Exception ex) {
-                                throw new SnapshotRestoreException(snapshot, "cannot restore index [" + index + "] because it cannot be " +
-                                    "upgraded", ex);
-                            }
-                            // Check that the index is closed or doesn't exist
-                            IndexMetaData currentIndexMetaData = currentState.metaData().index(renamedIndexName);
-                            IntSet ignoreShards = new IntHashSet();
-                            final Index renamedIndex;
-                            if (currentIndexMetaData == null) {
-                                // Index doesn't exist - create it and start recovery
-                                // Make sure that the index we are about to create has a validate name
-                                MetaDataCreateIndexService.validateIndexName(renamedIndexName, currentState);
-                                createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), false);
-                                IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData)
-                                                                                    .state(IndexMetaData.State.OPEN)
-                                                                                    .index(renamedIndexName);
-                                indexMdBuilder.settings(Settings.builder()
-                                                                .put(snapshotIndexMetaData.getSettings())
-                                                                .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()));
-                                MetaDataCreateIndexService.checkShardLimit(snapshotIndexMetaData.getSettings(), currentState);
-                                if (!request.includeAliases() && !snapshotIndexMetaData.getAliases().isEmpty()) {
-                                    // Remove all aliases - they shouldn't be restored
-                                    indexMdBuilder.removeAllAliases();
-                                } else {
-                                    for (ObjectCursor<String> alias : snapshotIndexMetaData.getAliases().keys()) {
-                                        aliases.add(alias.value);
-                                    }
-                                }
-                                IndexMetaData updatedIndexMetaData = indexMdBuilder.build();
-                                if (partial) {
-                                    populateIgnoredShards(index, ignoreShards);
+                        // Updating cluster state
+                        ClusterState.Builder builder = ClusterState.builder(currentState);
+                        MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
+                        ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
+                        RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
+                        ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards;
+                        Set<String> aliases = new HashSet<>();
+
+                        if (indices.isEmpty() == false) {
+                            // We have some indices to restore
+                            ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shardsBuilder =
+                                ImmutableOpenMap.builder();
+                            final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
+                                .minimumIndexCompatibilityVersion();
+                            for (Map.Entry<String, String> indexEntry : indices.entrySet()) {
+                                String index = indexEntry.getValue();
+                                boolean partial = checkPartial(index);
+                                SnapshotRecoverySource recoverySource =
+                                    new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index);
+                                String renamedIndexName = indexEntry.getKey();
+                                IndexMetaData snapshotIndexMetaData = metaData.index(index);
+                                snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData,
+                                    request.indexSettings(), request.ignoreIndexSettings());
+                                try {
+                                    snapshotIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(snapshotIndexMetaData,
+                                        minIndexCompatibilityVersion);
+                                } catch (Exception ex) {
+                                    throw new SnapshotRestoreException(snapshot, "cannot restore index [" + index +
+                                        "] because it cannot be upgraded", ex);
                                 }
-                                rtBuilder.addAsNewRestore(updatedIndexMetaData, recoverySource, ignoreShards);
-                                blocks.addBlocks(updatedIndexMetaData);
-                                mdBuilder.put(updatedIndexMetaData, true);
-                                renamedIndex = updatedIndexMetaData.getIndex();
-                            } else {
-                                validateExistingIndex(currentIndexMetaData, snapshotIndexMetaData, renamedIndexName, partial);
-                                // Index exists and it's closed - open it in metadata and start recovery
-                                IndexMetaData.Builder indexMdBuilder =
+                                // Check that the index is closed or doesn't exist
+                                IndexMetaData currentIndexMetaData = currentState.metaData().index(renamedIndexName);
+                                IntSet ignoreShards = new IntHashSet();
+                                final Index renamedIndex;
+                                if (currentIndexMetaData == null) {
+                                    // Index doesn't exist - create it and start recovery
+                                    // Make sure that the index we are about to create has a validate name
+                                    MetaDataCreateIndexService.validateIndexName(renamedIndexName, currentState);
+                                    createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), false);
+                                    IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData)
+                                        .state(IndexMetaData.State.OPEN)
+                                        .index(renamedIndexName);
+                                    indexMdBuilder.settings(Settings.builder()
+                                        .put(snapshotIndexMetaData.getSettings())
+                                        .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()));
+                                    MetaDataCreateIndexService.checkShardLimit(snapshotIndexMetaData.getSettings(), currentState);
+                                    if (!request.includeAliases() && !snapshotIndexMetaData.getAliases().isEmpty()) {
+                                        // Remove all aliases - they shouldn't be restored
+                                        indexMdBuilder.removeAllAliases();
+                                    } else {
+                                        for (ObjectCursor<String> alias : snapshotIndexMetaData.getAliases().keys()) {
+                                            aliases.add(alias.value);
+                                        }
+                                    }
+                                    IndexMetaData updatedIndexMetaData = indexMdBuilder.build();
+                                    if (partial) {
+                                        populateIgnoredShards(index, ignoreShards);
+                                    }
+                                    rtBuilder.addAsNewRestore(updatedIndexMetaData, recoverySource, ignoreShards);
+                                    blocks.addBlocks(updatedIndexMetaData);
+                                    mdBuilder.put(updatedIndexMetaData, true);
+                                    renamedIndex = updatedIndexMetaData.getIndex();
+                                } else {
+                                    validateExistingIndex(currentIndexMetaData, snapshotIndexMetaData, renamedIndexName, partial);
+                                    // Index exists and it's closed - open it in metadata and start recovery
+                                    IndexMetaData.Builder indexMdBuilder =
                                         IndexMetaData.builder(snapshotIndexMetaData).state(IndexMetaData.State.OPEN);
-                                indexMdBuilder.version(
+                                    indexMdBuilder.version(
                                         Math.max(snapshotIndexMetaData.getVersion(), 1 + currentIndexMetaData.getVersion()));
-                                indexMdBuilder.mappingVersion(
+                                    indexMdBuilder.mappingVersion(
                                         Math.max(snapshotIndexMetaData.getMappingVersion(), 1 + currentIndexMetaData.getMappingVersion()));
-                                indexMdBuilder.settingsVersion(
+                                    indexMdBuilder.settingsVersion(
                                         Math.max(
-                                                snapshotIndexMetaData.getSettingsVersion(),
-                                                1 + currentIndexMetaData.getSettingsVersion()));
-                                indexMdBuilder.aliasesVersion(
+                                            snapshotIndexMetaData.getSettingsVersion(),
+                                            1 + currentIndexMetaData.getSettingsVersion()));
+                                    indexMdBuilder.aliasesVersion(
                                         Math.max(snapshotIndexMetaData.getAliasesVersion(), 1 + currentIndexMetaData.getAliasesVersion()));
 
-                                for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) {
-                                    indexMdBuilder.primaryTerm(shard,
-                                        Math.max(snapshotIndexMetaData.primaryTerm(shard), currentIndexMetaData.primaryTerm(shard)));
-                                }
-
-                                if (!request.includeAliases()) {
-                                    // Remove all snapshot aliases
-                                    if (!snapshotIndexMetaData.getAliases().isEmpty()) {
-                                        indexMdBuilder.removeAllAliases();
+                                    for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) {
+                                        indexMdBuilder.primaryTerm(shard,
+                                            Math.max(snapshotIndexMetaData.primaryTerm(shard), currentIndexMetaData.primaryTerm(shard)));
                                     }
-                                    /// Add existing aliases
-                                    for (ObjectCursor<AliasMetaData> alias : currentIndexMetaData.getAliases().values()) {
-                                        indexMdBuilder.putAlias(alias.value);
-                                    }
-                                } else {
-                                    for (ObjectCursor<String> alias : snapshotIndexMetaData.getAliases().keys()) {
-                                        aliases.add(alias.value);
+
+                                    if (!request.includeAliases()) {
+                                        // Remove all snapshot aliases
+                                        if (!snapshotIndexMetaData.getAliases().isEmpty()) {
+                                            indexMdBuilder.removeAllAliases();
+                                        }
+                                        /// Add existing aliases
+                                        for (ObjectCursor<AliasMetaData> alias : currentIndexMetaData.getAliases().values()) {
+                                            indexMdBuilder.putAlias(alias.value);
+                                        }
+                                    } else {
+                                        for (ObjectCursor<String> alias : snapshotIndexMetaData.getAliases().keys()) {
+                                            aliases.add(alias.value);
+                                        }
                                     }
+                                    indexMdBuilder.settings(Settings.builder()
+                                        .put(snapshotIndexMetaData.getSettings())
+                                        .put(IndexMetaData.SETTING_INDEX_UUID,
+                                            currentIndexMetaData.getIndexUUID()));
+                                    IndexMetaData updatedIndexMetaData = indexMdBuilder.index(renamedIndexName).build();
+                                    rtBuilder.addAsRestore(updatedIndexMetaData, recoverySource);
+                                    blocks.updateBlocks(updatedIndexMetaData);
+                                    mdBuilder.put(updatedIndexMetaData, true);
+                                    renamedIndex = updatedIndexMetaData.getIndex();
                                 }
-                                indexMdBuilder.settings(Settings.builder()
-                                                                .put(snapshotIndexMetaData.getSettings())
-                                                                .put(IndexMetaData.SETTING_INDEX_UUID,
-                                                                    currentIndexMetaData.getIndexUUID()));
-                                IndexMetaData updatedIndexMetaData = indexMdBuilder.index(renamedIndexName).build();
-                                rtBuilder.addAsRestore(updatedIndexMetaData, recoverySource);
-                                blocks.updateBlocks(updatedIndexMetaData);
-                                mdBuilder.put(updatedIndexMetaData, true);
-                                renamedIndex = updatedIndexMetaData.getIndex();
-                            }
 
-                            for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) {
-                                if (!ignoreShards.contains(shard)) {
-                                    shardsBuilder.put(new ShardId(renamedIndex, shard),
+                                for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) {
+                                    if (!ignoreShards.contains(shard)) {
+                                        shardsBuilder.put(new ShardId(renamedIndex, shard),
                                             new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId()));
-                                } else {
-                                    shardsBuilder.put(new ShardId(renamedIndex, shard),
+                                    } else {
+                                        shardsBuilder.put(new ShardId(renamedIndex, shard),
                                             new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(),
                                                 RestoreInProgress.State.FAILURE));
+                                    }
                                 }
                             }
-                        }
 
-                        shards = shardsBuilder.build();
-                        RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(
-                            restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards),
-                            List.copyOf(indices.keySet()),
-                            shards
-                        );
-                        RestoreInProgress.Builder restoreInProgressBuilder;
-                        if (restoreInProgress != null) {
-                            restoreInProgressBuilder = new RestoreInProgress.Builder(restoreInProgress);
+                            shards = shardsBuilder.build();
+                            RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(
+                                restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards),
+                                List.copyOf(indices.keySet()),
+                                shards
+                            );
+                            RestoreInProgress.Builder restoreInProgressBuilder;
+                            if (restoreInProgress != null) {
+                                restoreInProgressBuilder = new RestoreInProgress.Builder(restoreInProgress);
+                            } else {
+                                restoreInProgressBuilder = new RestoreInProgress.Builder();
+                            }
+                            builder.putCustom(RestoreInProgress.TYPE, restoreInProgressBuilder.add(restoreEntry).build());
                         } else {
-                            restoreInProgressBuilder = new RestoreInProgress.Builder();
+                            shards = ImmutableOpenMap.of();
                         }
-                        builder.putCustom(RestoreInProgress.TYPE, restoreInProgressBuilder.add(restoreEntry).build());
-                    } else {
-                        shards = ImmutableOpenMap.of();
-                    }
 
-                    checkAliasNameConflicts(indices, aliases);
+                        checkAliasNameConflicts(indices, aliases);
 
-                    // Restore global state if needed
-                    if (request.includeGlobalState()) {
-                        if (metaData.persistentSettings() != null) {
-                            Settings settings = metaData.persistentSettings();
-                            clusterSettings.validateUpdate(settings);
-                            mdBuilder.persistentSettings(settings);
-                        }
-                        if (metaData.templates() != null) {
-                            // TODO: Should all existing templates be deleted first?
-                            for (ObjectCursor<IndexTemplateMetaData> cursor : metaData.templates().values()) {
-                                mdBuilder.put(cursor.value);
+                        // Restore global state if needed
+                        if (request.includeGlobalState()) {
+                            if (metaData.persistentSettings() != null) {
+                                Settings settings = metaData.persistentSettings();
+                                clusterSettings.validateUpdate(settings);
+                                mdBuilder.persistentSettings(settings);
                             }
-                        }
-                        if (metaData.customs() != null) {
-                            for (ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
-                                if (!RepositoriesMetaData.TYPE.equals(cursor.key)) {
-                                    // Don't restore repositories while we are working with them
-                                    // TODO: Should we restore them at the end?
-                                    mdBuilder.putCustom(cursor.key, cursor.value);
+                            if (metaData.templates() != null) {
+                                // TODO: Should all existing templates be deleted first?
+                                for (ObjectCursor<IndexTemplateMetaData> cursor : metaData.templates().values()) {
+                                    mdBuilder.put(cursor.value);
+                                }
+                            }
+                            if (metaData.customs() != null) {
+                                for (ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
+                                    if (!RepositoriesMetaData.TYPE.equals(cursor.key)) {
+                                        // Don't restore repositories while we are working with them
+                                        // TODO: Should we restore them at the end?
+                                        mdBuilder.putCustom(cursor.key, cursor.value);
+                                    }
                                 }
                             }
                         }
-                    }
 
-                    if (completed(shards)) {
-                        // We don't have any indices to restore - we are done
-                        restoreInfo = new RestoreInfo(snapshotId.getName(),
-                                                      List.copyOf(indices.keySet()),
-                                                      shards.size(),
-                                                      shards.size() - failedShards(shards));
-                    }
+                        if (completed(shards)) {
+                            // We don't have any indices to restore - we are done
+                            restoreInfo = new RestoreInfo(snapshotId.getName(),
+                                List.copyOf(indices.keySet()),
+                                shards.size(),
+                                shards.size() - failedShards(shards));
+                        }
 
-                    RoutingTable rt = rtBuilder.build();
-                    ClusterState updatedState = builder.metaData(mdBuilder).blocks(blocks).routingTable(rt).build();
-                    return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]");
-                }
+                        RoutingTable rt = rtBuilder.build();
+                        ClusterState updatedState = builder.metaData(mdBuilder).blocks(blocks).routingTable(rt).build();
+                        return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]");
+                    }
 
-                private void checkAliasNameConflicts(Map<String, String> renamedIndices, Set<String> aliases) {
-                    for (Map.Entry<String, String> renamedIndex : renamedIndices.entrySet()) {
-                        if (aliases.contains(renamedIndex.getKey())) {
-                            throw new SnapshotRestoreException(snapshot,
-                                "cannot rename index [" + renamedIndex.getValue() + "] into [" + renamedIndex.getKey() + "] because of " +
-                                    "conflict with an alias with the same name");
+                    private void checkAliasNameConflicts(Map<String, String> renamedIndices, Set<String> aliases) {
+                        for (Map.Entry<String, String> renamedIndex : renamedIndices.entrySet()) {
+                            if (aliases.contains(renamedIndex.getKey())) {
+                                throw new SnapshotRestoreException(snapshot,
+                                    "cannot rename index [" + renamedIndex.getValue() + "] into [" + renamedIndex.getKey()
+                                        + "] because of conflict with an alias with the same name");
+                            }
                         }
                     }
-                }
 
-                private void populateIgnoredShards(String index, IntSet ignoreShards) {
-                    for (SnapshotShardFailure failure : snapshotInfo.shardFailures()) {
-                        if (index.equals(failure.index())) {
-                            ignoreShards.add(failure.shardId());
+                    private void populateIgnoredShards(String index, IntSet ignoreShards) {
+                        for (SnapshotShardFailure failure : snapshotInfo.shardFailures()) {
+                            if (index.equals(failure.index())) {
+                                ignoreShards.add(failure.shardId());
+                            }
                         }
                     }
-                }
 
-                private boolean checkPartial(String index) {
-                    // Make sure that index was fully snapshotted
-                    if (failed(snapshotInfo, index)) {
-                        if (request.partial()) {
-                            return true;
+                    private boolean checkPartial(String index) {
+                        // Make sure that index was fully snapshotted
+                        if (failed(snapshotInfo, index)) {
+                            if (request.partial()) {
+                                return true;
+                            } else {
+                                throw new SnapshotRestoreException(snapshot, "index [" + index + "] wasn't fully snapshotted - cannot " +
+                                    "restore");
+                            }
                         } else {
-                            throw new SnapshotRestoreException(snapshot, "index [" + index + "] wasn't fully snapshotted - cannot " +
-                                "restore");
+                            return false;
                         }
-                    } else {
-                        return false;
                     }
-                }
 
-                private void validateExistingIndex(IndexMetaData currentIndexMetaData, IndexMetaData snapshotIndexMetaData,
-                                                   String renamedIndex, boolean partial) {
-                    // Index exist - checking that it's closed
-                    if (currentIndexMetaData.getState() != IndexMetaData.State.CLOSE) {
-                        // TODO: Enable restore for open indices
-                        throw new SnapshotRestoreException(snapshot, "cannot restore index [" + renamedIndex + "] because an open index " +
-                            "with same name already exists in the cluster. Either close or delete the existing index or restore the " +
-                            "index under a different name by providing a rename pattern and replacement name");
-                    }
-                    // Index exist - checking if it's partial restore
-                    if (partial) {
-                        throw new SnapshotRestoreException(snapshot, "cannot restore partial index [" + renamedIndex + "] because such " +
-                            "index already exists");
-                    }
-                    // Make sure that the number of shards is the same. That's the only thing that we cannot change
-                    if (currentIndexMetaData.getNumberOfShards() != snapshotIndexMetaData.getNumberOfShards()) {
-                        throw new SnapshotRestoreException(snapshot,
-                            "cannot restore index [" + renamedIndex + "] with [" + currentIndexMetaData.getNumberOfShards() + "] shards " +
-                                "from a snapshot of index [" + snapshotIndexMetaData.getIndex().getName() + "] with [" +
-                                snapshotIndexMetaData.getNumberOfShards() + "] shards");
+                    private void validateExistingIndex(IndexMetaData currentIndexMetaData, IndexMetaData snapshotIndexMetaData,
+                        String renamedIndex, boolean partial) {
+                        // Index exist - checking that it's closed
+                        if (currentIndexMetaData.getState() != IndexMetaData.State.CLOSE) {
+                            // TODO: Enable restore for open indices
+                            throw new SnapshotRestoreException(snapshot, "cannot restore index [" + renamedIndex
+                                + "] because an open index " +
+                                "with same name already exists in the cluster. Either close or delete the existing index or restore the " +
+                                "index under a different name by providing a rename pattern and replacement name");
+                        }
+                        // Index exist - checking if it's partial restore
+                        if (partial) {
+                            throw new SnapshotRestoreException(snapshot, "cannot restore partial index [" + renamedIndex
+                                + "] because such index already exists");
+                        }
+                        // Make sure that the number of shards is the same. That's the only thing that we cannot change
+                        if (currentIndexMetaData.getNumberOfShards() != snapshotIndexMetaData.getNumberOfShards()) {
+                            throw new SnapshotRestoreException(snapshot,
+                                "cannot restore index [" + renamedIndex + "] with [" + currentIndexMetaData.getNumberOfShards()
+                                    + "] shards from a snapshot of index [" + snapshotIndexMetaData.getIndex().getName() + "] with [" +
+                                    snapshotIndexMetaData.getNumberOfShards() + "] shards");
+                        }
                     }
-                }
 
-                /**
-                 * Optionally updates index settings in indexMetaData by removing settings listed in ignoreSettings and
-                 * merging them with settings in changeSettings.
-                 */
-                private IndexMetaData updateIndexSettings(IndexMetaData indexMetaData, Settings changeSettings, String[] ignoreSettings) {
-                    Settings normalizedChangeSettings = Settings.builder()
-                                                                .put(changeSettings)
-                                                                .normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX)
-                                                                .build();
-                    IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData);
-                    Settings settings = indexMetaData.getSettings();
-                    Set<String> keyFilters = new HashSet<>();
-                    List<String> simpleMatchPatterns = new ArrayList<>();
-                    for (String ignoredSetting : ignoreSettings) {
-                        if (!Regex.isSimpleMatchPattern(ignoredSetting)) {
-                            if (UNREMOVABLE_SETTINGS.contains(ignoredSetting)) {
-                                throw new SnapshotRestoreException(snapshot, "cannot remove setting [" + ignoredSetting + "] on restore");
+                    /**
+                     * Optionally updates index settings in indexMetaData by removing settings listed in ignoreSettings and
+                     * merging them with settings in changeSettings.
+                     */
+                    private IndexMetaData updateIndexSettings(IndexMetaData indexMetaData, Settings changeSettings,
+                                                              String[] ignoreSettings) {
+                        Settings normalizedChangeSettings = Settings.builder()
+                            .put(changeSettings)
+                            .normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX)
+                            .build();
+                        IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData);
+                        Settings settings = indexMetaData.getSettings();
+                        Set<String> keyFilters = new HashSet<>();
+                        List<String> simpleMatchPatterns = new ArrayList<>();
+                        for (String ignoredSetting : ignoreSettings) {
+                            if (!Regex.isSimpleMatchPattern(ignoredSetting)) {
+                                if (UNREMOVABLE_SETTINGS.contains(ignoredSetting)) {
+                                    throw new SnapshotRestoreException(
+                                        snapshot, "cannot remove setting [" + ignoredSetting + "] on restore");
+                                } else {
+                                    keyFilters.add(ignoredSetting);
+                                }
                             } else {
-                                keyFilters.add(ignoredSetting);
+                                simpleMatchPatterns.add(ignoredSetting);
                             }
-                        } else {
-                            simpleMatchPatterns.add(ignoredSetting);
                         }
-                    }
-                    Predicate<String> settingsFilter = k -> {
-                        if (UNREMOVABLE_SETTINGS.contains(k) == false) {
-                            for (String filterKey : keyFilters) {
-                                if (k.equals(filterKey)) {
-                                    return false;
+                        Predicate<String> settingsFilter = k -> {
+                            if (UNREMOVABLE_SETTINGS.contains(k) == false) {
+                                for (String filterKey : keyFilters) {
+                                    if (k.equals(filterKey)) {
+                                        return false;
+                                    }
                                 }
-                            }
-                            for (String pattern : simpleMatchPatterns) {
-                                if (Regex.simpleMatch(pattern, k)) {
-                                    return false;
+                                for (String pattern : simpleMatchPatterns) {
+                                    if (Regex.simpleMatch(pattern, k)) {
+                                        return false;
+                                    }
                                 }
                             }
-                        }
-                        return true;
-                    };
-                    Settings.Builder settingsBuilder = Settings.builder()
-                        .put(settings.filter(settingsFilter))
-                        .put(normalizedChangeSettings.filter(k -> {
-                            if (UNMODIFIABLE_SETTINGS.contains(k)) {
-                                throw new SnapshotRestoreException(snapshot, "cannot modify setting [" + k + "] on restore");
-                            } else {
-                                return true;
-                            }
-                        }));
-                    settingsBuilder.remove(MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey());
-                    return builder.settings(settingsBuilder).build();
-                }
-
-                @Override
-                public void onFailure(String source, Exception e) {
-                    logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e);
-                    listener.onFailure(e);
-                }
-
-                @Override
-                public TimeValue timeout() {
-                    return request.masterNodeTimeout();
-                }
+                            return true;
+                        };
+                        Settings.Builder settingsBuilder = Settings.builder()
+                            .put(settings.filter(settingsFilter))
+                            .put(normalizedChangeSettings.filter(k -> {
+                                if (UNMODIFIABLE_SETTINGS.contains(k)) {
+                                    throw new SnapshotRestoreException(snapshot, "cannot modify setting [" + k + "] on restore");
+                                } else {
+                                    return true;
+                                }
+                            }));
+                        settingsBuilder.remove(MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey());
+                        return builder.settings(settingsBuilder).build();
+                    }
 
-                @Override
-                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                    listener.onResponse(new RestoreCompletionResponse(restoreUUID, snapshot, restoreInfo));
-                }
-            });
+                    @Override
+                    public void onFailure(String source, Exception e) {
+                        logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e);
+                        listener.onFailure(e);
+                    }
 
+                    @Override
+                    public TimeValue timeout() {
+                        return request.masterNodeTimeout();
+                    }
 
+                    @Override
+                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                        listener.onResponse(new RestoreCompletionResponse(restoreUUID, snapshot, restoreInfo));
+                    }
+                });
+            }, listener::onFailure);
         } catch (Exception e) {
             logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot",
                 request.repository() + ":" + request.snapshot()), e);

+ 189 - 180
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -29,7 +29,9 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
+import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateApplier;
@@ -160,7 +162,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
     public RepositoryData getRepositoryData(final String repositoryName) {
         Repository repository = repositoriesService.repository(repositoryName);
         assert repository != null; // should only be called once we've validated the repository exists
-        return repository.getRepositoryData();
+        return PlainActionFuture.get(repository::getRepositoryData);
     }
 
     /**
@@ -260,86 +262,88 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
         validate(repositoryName, snapshotName);
         final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
-        final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();
-
-        clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
-
-            private SnapshotsInProgress.Entry newSnapshot = null;
-
-            @Override
-            public ClusterState execute(ClusterState currentState) {
-                validate(repositoryName, snapshotName, currentState);
-                SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
-                if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
-                    throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
-                        "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
-                }
-                final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
-                if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
-                    throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
-                        "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
-                }
-                SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
-                if (snapshots == null || snapshots.entries().isEmpty()) {
-                    // Store newSnapshot here to be processed in clusterStateProcessed
-                    List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
-                                                        request.indicesOptions(), request.indices()));
-                    logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
-                    List<IndexId> snapshotIndices = repositoryData.resolveNewIndices(indices);
-                    newSnapshot = new SnapshotsInProgress.Entry(
-                        new Snapshot(repositoryName, snapshotId),
-                        request.includeGlobalState(), request.partial(),
-                        State.INIT,
-                        snapshotIndices,
-                        threadPool.absoluteTimeInMillis(),
-                        repositoryData.getGenId(),
-                        null,
-                        request.userMetadata(),
-                        clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
-                    initializingSnapshots.add(newSnapshot.snapshot());
-                    snapshots = new SnapshotsInProgress(newSnapshot);
-                } else {
-                    throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
+        final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
+        repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener);
+        repositoryDataListener.whenComplete(repositoryData -> {
+            clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
+
+                private SnapshotsInProgress.Entry newSnapshot = null;
+
+                @Override
+                public ClusterState execute(ClusterState currentState) {
+                    validate(repositoryName, snapshotName, currentState);
+                    SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
+                    if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
+                        throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
+                            "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
+                    }
+                    final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
+                    if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
+                        throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
+                            "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
+                    }
+                    SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
+                    if (snapshots == null || snapshots.entries().isEmpty()) {
+                        // Store newSnapshot here to be processed in clusterStateProcessed
+                        List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
+                            request.indicesOptions(), request.indices()));
+                        logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
+                        List<IndexId> snapshotIndices = repositoryData.resolveNewIndices(indices);
+                        newSnapshot = new SnapshotsInProgress.Entry(
+                            new Snapshot(repositoryName, snapshotId),
+                            request.includeGlobalState(), request.partial(),
+                            State.INIT,
+                            snapshotIndices,
+                            threadPool.absoluteTimeInMillis(),
+                            repositoryData.getGenId(),
+                            null,
+                            request.userMetadata(),
+                            clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
+                        initializingSnapshots.add(newSnapshot.snapshot());
+                        snapshots = new SnapshotsInProgress(newSnapshot);
+                    } else {
+                        throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
+                    }
+                    return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
                 }
-                return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
-            }
 
-            @Override
-            public void onFailure(String source, Exception e) {
-                logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
-                if (newSnapshot != null) {
-                    initializingSnapshots.remove(newSnapshot.snapshot());
+                @Override
+                public void onFailure(String source, Exception e) {
+                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
+                    if (newSnapshot != null) {
+                        initializingSnapshots.remove(newSnapshot.snapshot());
+                    }
+                    newSnapshot = null;
+                    listener.onFailure(e);
                 }
-                newSnapshot = null;
-                listener.onFailure(e);
-            }
 
-            @Override
-            public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
-                if (newSnapshot != null) {
-                    final Snapshot current = newSnapshot.snapshot();
-                    assert initializingSnapshots.contains(current);
-                    beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<>() {
-                        @Override
-                        public void onResponse(final Snapshot snapshot) {
-                            initializingSnapshots.remove(snapshot);
-                            listener.onResponse(snapshot);
-                        }
+                @Override
+                public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
+                    if (newSnapshot != null) {
+                        final Snapshot current = newSnapshot.snapshot();
+                        assert initializingSnapshots.contains(current);
+                        beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<>() {
+                            @Override
+                            public void onResponse(final Snapshot snapshot) {
+                                initializingSnapshots.remove(snapshot);
+                                listener.onResponse(snapshot);
+                            }
 
-                        @Override
-                        public void onFailure(final Exception e) {
-                            initializingSnapshots.remove(current);
-                            listener.onFailure(e);
-                        }
-                    });
+                            @Override
+                            public void onFailure(final Exception e) {
+                                initializingSnapshots.remove(current);
+                                listener.onFailure(e);
+                            }
+                        });
+                    }
                 }
-            }
 
-            @Override
-            public TimeValue timeout() {
-                return request.masterNodeTimeout();
-            }
-        });
+                @Override
+                public TimeValue timeout() {
+                    return request.masterNodeTimeout();
+                }
+            });
+        }, listener::onFailure);
     }
 
     /**
@@ -412,104 +416,108 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository");
                 }
                 final String snapshotName = snapshot.snapshot().getSnapshotId().getName();
-                final RepositoryData repositoryData = repository.getRepositoryData();
-                // check if the snapshot name already exists in the repository
-                if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
-                    throw new InvalidSnapshotNameException(
-                        repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
-                }
-                snapshotCreated = true;
-
-                logger.info("snapshot [{}] started", snapshot.snapshot());
-                if (snapshot.indices().isEmpty()) {
-                    // No indices in this snapshot - we are done
-                    userCreateSnapshotListener.onResponse(snapshot.snapshot());
-                    endSnapshot(snapshot, clusterState.metaData());
-                    return;
-                }
-                clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
-
-                    @Override
-                    public ClusterState execute(ClusterState currentState) {
-                        SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
-                        List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
-                        for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
-                            if (entry.snapshot().equals(snapshot.snapshot()) == false) {
-                                entries.add(entry);
-                                continue;
-                            }
+                final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
+                repository.getRepositoryData(repositoryDataListener);
+                repositoryDataListener.whenComplete(repositoryData -> {
+                    // check if the snapshot name already exists in the repository
+                    if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
+                        throw new InvalidSnapshotNameException(
+                            repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
+                    }
+                    snapshotCreated = true;
 
-                            if (entry.state() == State.ABORTED) {
-                                entries.add(entry);
-                                assert entry.shards().isEmpty();
-                                hadAbortedInitializations = true;
-                            } else {
-                                // Replace the snapshot that was just initialized
-                                ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(currentState, entry, repositoryData);
-                                if (!partial) {
-                                    Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
-                                        currentState.metaData());
-                                    Set<String> missing = indicesWithMissingShards.v1();
-                                    Set<String> closed = indicesWithMissingShards.v2();
-                                    if (missing.isEmpty() == false || closed.isEmpty() == false) {
-                                        final StringBuilder failureMessage = new StringBuilder();
-                                        if (missing.isEmpty() == false) {
-                                            failureMessage.append("Indices don't have primary shards ");
-                                            failureMessage.append(missing);
-                                        }
-                                        if (closed.isEmpty() == false) {
-                                            if (failureMessage.length() > 0) {
-                                                failureMessage.append("; ");
+                    logger.info("snapshot [{}] started", snapshot.snapshot());
+                    if (snapshot.indices().isEmpty()) {
+                        // No indices in this snapshot - we are done
+                        userCreateSnapshotListener.onResponse(snapshot.snapshot());
+                        endSnapshot(snapshot, clusterState.metaData());
+                        return;
+                    }
+                    clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
+
+                        @Override
+                        public ClusterState execute(ClusterState currentState) {
+                            SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
+                            List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
+                            for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
+                                if (entry.snapshot().equals(snapshot.snapshot()) == false) {
+                                    entries.add(entry);
+                                    continue;
+                                }
+
+                                if (entry.state() == State.ABORTED) {
+                                    entries.add(entry);
+                                    assert entry.shards().isEmpty();
+                                    hadAbortedInitializations = true;
+                                } else {
+                                    // Replace the snapshot that was just initialized
+                                    ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(currentState, entry, repositoryData);
+                                    if (!partial) {
+                                        Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
+                                            currentState.metaData());
+                                        Set<String> missing = indicesWithMissingShards.v1();
+                                        Set<String> closed = indicesWithMissingShards.v2();
+                                        if (missing.isEmpty() == false || closed.isEmpty() == false) {
+                                            final StringBuilder failureMessage = new StringBuilder();
+                                            if (missing.isEmpty() == false) {
+                                                failureMessage.append("Indices don't have primary shards ");
+                                                failureMessage.append(missing);
+                                            }
+                                            if (closed.isEmpty() == false) {
+                                                if (failureMessage.length() > 0) {
+                                                    failureMessage.append("; ");
+                                                }
+                                                failureMessage.append("Indices are closed ");
+                                                failureMessage.append(closed);
                                             }
-                                            failureMessage.append("Indices are closed ");
-                                            failureMessage.append(closed);
+                                            entries.add(
+                                                new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString()));
+                                            continue;
                                         }
-                                        entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString()));
-                                        continue;
                                     }
+                                    entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards));
                                 }
-                                entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards));
                             }
+                            return ClusterState.builder(currentState)
+                                .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries)))
+                                .build();
                         }
-                        return ClusterState.builder(currentState)
-                            .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries)))
-                            .build();
-                    }
 
-                    @Override
-                    public void onFailure(String source, Exception e) {
-                        logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot",
-                            snapshot.snapshot().getSnapshotId()), e);
-                        removeSnapshotFromClusterState(snapshot.snapshot(), null, e,
-                            new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e));
-                    }
-
-                    @Override
-                    public void onNoLongerMaster(String source) {
-                        // We are not longer a master - we shouldn't try to do any cleanup
-                        // The new master will take care of it
-                        logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId());
-                        userCreateSnapshotListener.onFailure(
-                            new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization"));
-                    }
+                        @Override
+                        public void onFailure(String source, Exception e) {
+                            logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot",
+                                snapshot.snapshot().getSnapshotId()), e);
+                            removeSnapshotFromClusterState(snapshot.snapshot(), null, e,
+                                new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e));
+                        }
 
-                    @Override
-                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                        // The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted
-                        // for processing. If client wants to wait for the snapshot completion, it can register snapshot
-                        // completion listener in this method. For the snapshot completion to work properly, the snapshot
-                        // should still exist when listener is registered.
-                        userCreateSnapshotListener.onResponse(snapshot.snapshot());
+                        @Override
+                        public void onNoLongerMaster(String source) {
+                            // We are not longer a master - we shouldn't try to do any cleanup
+                            // The new master will take care of it
+                            logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId());
+                            userCreateSnapshotListener.onFailure(
+                                new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization"));
+                        }
 
-                        if (hadAbortedInitializations) {
-                            final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
-                            assert snapshotsInProgress != null;
-                            final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot());
-                            assert entry != null;
-                            endSnapshot(entry, newState.metaData());
+                        @Override
+                        public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                            // The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted
+                            // for processing. If client wants to wait for the snapshot completion, it can register snapshot
+                            // completion listener in this method. For the snapshot completion to work properly, the snapshot
+                            // should still exist when listener is registered.
+                            userCreateSnapshotListener.onResponse(snapshot.snapshot());
+
+                            if (hadAbortedInitializations) {
+                                final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
+                                assert snapshotsInProgress != null;
+                                final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot());
+                                assert entry != null;
+                                endSnapshot(entry, newState.metaData());
+                            }
                         }
-                    }
-                });
+                    });
+                }, this::onFailure);
             }
 
             @Override
@@ -1138,26 +1146,27 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                                final boolean immediatePriority) {
         // First, look for the snapshot in the repository
         final Repository repository = repositoriesService.repository(repositoryName);
-        final RepositoryData repositoryData = repository.getRepositoryData();
-        Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
-                                                .stream()
-                                                .filter(s -> s.getName().equals(snapshotName))
-                                                .findFirst();
-        // if nothing found by the same name, then look in the cluster state for current in progress snapshots
-        long repoGenId = repositoryData.getGenId();
-        if (matchedEntry.isPresent() == false) {
-            Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream()
-                               .filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
-            if (matchedInProgress.isPresent()) {
-                matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());
-                // Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes
-                repoGenId = matchedInProgress.get().getRepositoryStateId() + 1L;
+        repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
+            Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
+                .stream()
+                .filter(s -> s.getName().equals(snapshotName))
+                .findFirst();
+            // if nothing found by the same name, then look in the cluster state for current in progress snapshots
+            long repoGenId = repositoryData.getGenId();
+            if (matchedEntry.isPresent() == false) {
+                Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream()
+                    .filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
+                if (matchedInProgress.isPresent()) {
+                    matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());
+                    // Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes
+                    repoGenId = matchedInProgress.get().getRepositoryStateId() + 1L;
+                }
             }
-        }
-        if (matchedEntry.isPresent() == false) {
-            throw new SnapshotMissingException(repositoryName, snapshotName);
-        }
-        deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repoGenId, immediatePriority);
+            if (matchedEntry.isPresent() == false) {
+                throw new SnapshotMissingException(repositoryName, snapshotName);
+            }
+            deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repoGenId, immediatePriority);
+        }, listener::onFailure));
     }
 
     /**

+ 2 - 2
server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

@@ -149,8 +149,8 @@ public class RepositoriesServiceTests extends ESTestCase {
         }
 
         @Override
-        public RepositoryData getRepositoryData() {
-            return null;
+        public void getRepositoryData(ActionListener<RepositoryData> listener) {
+            listener.onResponse(null);
         }
 
         @Override

+ 11 - 11
server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java

@@ -130,7 +130,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
             (BlobStoreRepository) repositoriesService.repository(repositoryName);
         final List<SnapshotId> originalSnapshots = Arrays.asList(snapshotId1, snapshotId2);
 
-        List<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds().stream()
+        List<SnapshotId> snapshotIds = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().stream()
             .sorted((s1, s2) -> s1.getName().compareTo(s2.getName())).collect(Collectors.toList());
         assertThat(snapshotIds, equalTo(originalSnapshots));
     }
@@ -139,10 +139,10 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
         final BlobStoreRepository repository = setupRepo();
 
         // write to and read from a index file with no entries
-        assertThat(repository.getRepositoryData().getSnapshotIds().size(), equalTo(0));
+        assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().size(), equalTo(0));
         final RepositoryData emptyData = RepositoryData.EMPTY;
         repository.writeIndexGen(emptyData, emptyData.getGenId(), true);
-        RepositoryData repoData = repository.getRepositoryData();
+        RepositoryData repoData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository);
         assertEquals(repoData, emptyData);
         assertEquals(repoData.getIndices().size(), 0);
         assertEquals(repoData.getSnapshotIds().size(), 0);
@@ -151,12 +151,12 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
         // write to and read from an index file with snapshots but no indices
         repoData = addRandomSnapshotsToRepoData(repoData, false);
         repository.writeIndexGen(repoData, repoData.getGenId(), true);
-        assertEquals(repoData, repository.getRepositoryData());
+        assertEquals(repoData, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository));
 
         // write to and read from a index file with random repository data
-        repoData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true);
+        repoData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true);
         repository.writeIndexGen(repoData, repoData.getGenId(), true);
-        assertEquals(repoData, repository.getRepositoryData());
+        assertEquals(repoData, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository));
     }
 
     public void testIndexGenerationalFiles() throws Exception {
@@ -165,22 +165,22 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
         // write to index generational file
         RepositoryData repositoryData = generateRandomRepoData();
         repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
-        assertThat(repository.getRepositoryData(), equalTo(repositoryData));
+        assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), equalTo(repositoryData));
         assertThat(repository.latestIndexBlobId(), equalTo(0L));
         assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L));
 
         // adding more and writing to a new index generational file
-        repositoryData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true);
+        repositoryData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true);
         repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
-        assertEquals(repository.getRepositoryData(), repositoryData);
+        assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
         assertThat(repository.latestIndexBlobId(), equalTo(1L));
         assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L));
 
         // removing a snapshot and writing to a new index generational file
-        repositoryData = repository.getRepositoryData().removeSnapshot(
+        repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).removeSnapshot(
             repositoryData.getSnapshotIds().iterator().next(), ShardGenerations.EMPTY);
         repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
-        assertEquals(repository.getRepositoryData(), repositoryData);
+        assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
         assertThat(repository.latestIndexBlobId(), equalTo(2L));
         assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L));
     }

+ 5 - 10
server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -18,8 +18,8 @@
  */
 package org.elasticsearch.snapshots;
 
-import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.common.settings.Settings;
@@ -44,7 +44,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -93,17 +92,13 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
         skipRepoConsistencyCheckReason = reason;
     }
 
-    protected RepositoryData getRepositoryData(Repository repository) throws InterruptedException {
+    protected RepositoryData getRepositoryData(Repository repository) {
         ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
-        final SetOnce<RepositoryData> repositoryData = new SetOnce<>();
-        final CountDownLatch latch = new CountDownLatch(1);
+        final PlainActionFuture<RepositoryData> repositoryData = PlainActionFuture.newFuture();
         threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
-            repositoryData.set(repository.getRepositoryData());
-            latch.countDown();
+            repository.getRepositoryData(repositoryData);
         });
-
-        latch.await();
-        return repositoryData.get();
+        return repositoryData.actionGet();
     }
 
     public static long getFailureCount(String repository) {

+ 2 - 2
server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

@@ -358,7 +358,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         assertThat(client.prepareGet(restoredIndexName, docId).get().isExists(), equalTo(true));
     }
 
-    public void testFreshIndexUUID() throws InterruptedException {
+    public void testFreshIndexUUID() {
         Client client = client();
 
         logger.info("-->  creating repository");
@@ -781,7 +781,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
     }
 
-    public void testSnapshotFileFailureDuringSnapshot() throws InterruptedException {
+    public void testSnapshotFileFailureDuringSnapshot() {
         disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
         Client client = client();
 

+ 17 - 7
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -75,6 +75,7 @@ import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.AutoCreateIndex;
 import org.elasticsearch.action.support.DestructiveOperations;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -156,6 +157,7 @@ import org.elasticsearch.node.ResponseCollectorService;
 import org.elasticsearch.plugins.PluginsService;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
 import org.elasticsearch.repositories.fs.FsRepository;
@@ -308,7 +310,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
         assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
         final Repository repository = masterNode.repositoriesService.repository(repoName);
-        Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
+        Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
         assertThat(snapshotIds, hasSize(1));
 
         final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
@@ -369,7 +371,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE);
         assertThat(finalSnapshotsInProgress.entries(), empty());
         final Repository repository = randomMaster.repositoriesService.repository(repoName);
-        Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
+        Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
         assertThat(snapshotIds, hasSize(1));
     }
 
@@ -408,7 +410,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
         assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
         final Repository repository = masterNode.repositoriesService.repository(repoName);
-        Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
+        Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
         assertThat(snapshotIds, hasSize(1));
 
         final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
@@ -474,7 +476,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
         assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
         final Repository repository = masterNode.repositoriesService.repository(repoName);
-        Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
+        Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
         assertThat(snapshotIds, hasSize(thirdSnapshotResponse == null ? 2 : 3));
 
         for (SnapshotId snapshotId : snapshotIds) {
@@ -559,8 +561,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
         final SnapshotsInProgress finalSnapshotsInProgress = testClusterNodes.randomDataNodeSafe()
             .clusterService.state().custom(SnapshotsInProgress.TYPE);
         assertThat(finalSnapshotsInProgress.entries(), empty());
-        final Repository repository = masterNode.repositoriesService.repository(repoName);
-        Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
+        final Repository repository = testClusterNodes.randomMasterNodeSafe().repositoriesService.repository(repoName);
+        Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
         assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0)));
     }
 
@@ -633,7 +635,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
         SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
         assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
         final Repository repository = masterNode.repositoriesService.repository(repoName);
-        Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
+        Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
         assertThat(snapshotIds, hasSize(1));
 
         final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
@@ -643,6 +645,14 @@ public class SnapshotResiliencyTests extends ESTestCase {
         assertEquals(0, snapshotInfo.failedShards());
     }
 
+    private RepositoryData getRepositoryData(Repository repository) {
+        final PlainActionFuture<RepositoryData> res = PlainActionFuture.newFuture();
+        repository.getRepositoryData(res);
+        deterministicTaskQueue.runAllRunnableTasks();
+        assertTrue(res.isDone());
+        return res.actionGet();
+    }
+
     private StepListener<CreateIndexResponse> createRepoAndIndex(TestClusterNodes.TestClusterNode masterNode, String repoName,
                                                                  String index, int shards) {
         final AdminClient adminClient = masterNode.client.admin();

+ 1 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java

@@ -140,7 +140,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
         assertThat(snapshotsResponse.getFailedResponses().get("test-repo"), instanceOf(SnapshotMissingException.class));
     }
 
-    public void testExceptionOnMissingShardLevelSnapBlob() throws IOException, InterruptedException {
+    public void testExceptionOnMissingShardLevelSnapBlob() throws IOException {
         disableRepoConsistencyCheck("This test intentionally corrupts the repository");
 
         logger.info("--> creating repository");

+ 3 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -79,6 +79,7 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
 import org.elasticsearch.indices.recovery.StartRecoveryRequest;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
 import org.elasticsearch.snapshots.Snapshot;
 import org.elasticsearch.test.DummyShardLock;
 import org.elasticsearch.test.ESTestCase;
@@ -833,7 +834,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
         final Index index = shard.shardId().getIndex();
         final IndexId indexId = new IndexId(index.getName(), index.getUUID());
         final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(
-            repository.getRepositoryData().shardGenerations().getShardGen(indexId, shard.shardId().getId()));
+            ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).shardGenerations().getShardGen(
+                indexId, shard.shardId().getId()));
         final PlainActionFuture<String> future = PlainActionFuture.newFuture();
         final String shardGen;
         try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {

+ 3 - 3
test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

@@ -85,10 +85,10 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
     }
 
     @Override
-    public RepositoryData getRepositoryData() {
+    public void getRepositoryData(ActionListener<RepositoryData> listener) {
         final IndexId indexId = new IndexId(indexName, "blah");
-        return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(),
-            Collections.singletonMap(indexId, emptySet()), ShardGenerations.EMPTY);
+        listener.onResponse(new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(),
+            Collections.singletonMap(indexId, emptySet()), ShardGenerations.EMPTY));
     }
 
     @Override

+ 9 - 7
test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java

@@ -24,12 +24,14 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.snapshots.SnapshotMissingException;
 import org.elasticsearch.snapshots.SnapshotRestoreException;
@@ -41,7 +43,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -58,6 +59,10 @@ import static org.hamcrest.Matchers.nullValue;
  */
 public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase {
 
+    public static RepositoryData getRepositoryData(Repository repository) {
+        return PlainActionFuture.get(repository::getRepositoryData);
+    }
+
     protected abstract String repositoryType();
 
     protected Settings repositorySettings() {
@@ -256,16 +261,13 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
         BlobStoreRepository repository = (BlobStoreRepository) repositoriesSvc.repository(repoName);
 
         final SetOnce<BlobContainer> indicesBlobContainer = new SetOnce<>();
-        final SetOnce<RepositoryData> repositoryData = new SetOnce<>();
-        final CountDownLatch latch = new CountDownLatch(1);
+        final PlainActionFuture<RepositoryData> repositoryData = PlainActionFuture.newFuture();
         threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
             indicesBlobContainer.set(repository.blobStore().blobContainer(repository.basePath().add("indices")));
-            repositoryData.set(repository.getRepositoryData());
-            latch.countDown();
+            repository.getRepositoryData(repositoryData);
         });
 
-        latch.await();
-        for (IndexId indexId : repositoryData.get().getIndices().values()) {
+        for (IndexId indexId : repositoryData.actionGet().getIndices().values()) {
             if (indexId.getName().equals("test-idx-3")) {
                 assertFalse(BlobStoreTestUtil.blobExists(indicesBlobContainer.get(), indexId.getId())); // deleted index
             }

+ 22 - 20
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -221,26 +221,28 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
     }
 
     @Override
-    public RepositoryData getRepositoryData() {
-        Client remoteClient = getRemoteClusterClient();
-        ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true)
-            .get(ccrSettings.getRecoveryActionTimeout());
-        MetaData remoteMetaData = response.getState().getMetaData();
-
-        Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
-        Map<String, SnapshotState> snapshotStates = new HashMap<>(copiedSnapshotIds.size());
-        Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>(copiedSnapshotIds.size());
-
-        ImmutableOpenMap<String, IndexMetaData> remoteIndices = remoteMetaData.getIndices();
-        for (String indexName : remoteMetaData.getConcreteAllIndices()) {
-            // Both the Snapshot name and UUID are set to _latest_
-            SnapshotId snapshotId = new SnapshotId(LATEST, LATEST);
-            copiedSnapshotIds.put(indexName, snapshotId);
-            snapshotStates.put(indexName, SnapshotState.SUCCESS);
-            Index index = remoteIndices.get(indexName).getIndex();
-            indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singleton(snapshotId));
-        }
-        return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots, ShardGenerations.EMPTY);
+    public void getRepositoryData(ActionListener<RepositoryData> listener) {
+        ActionListener.completeWith(listener, () -> {
+            Client remoteClient = getRemoteClusterClient();
+            ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true)
+                .get(ccrSettings.getRecoveryActionTimeout());
+            MetaData remoteMetaData = response.getState().getMetaData();
+
+            Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
+            Map<String, SnapshotState> snapshotStates = new HashMap<>(copiedSnapshotIds.size());
+            Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>(copiedSnapshotIds.size());
+
+            ImmutableOpenMap<String, IndexMetaData> remoteIndices = remoteMetaData.getIndices();
+            for (String indexName : remoteMetaData.getConcreteAllIndices()) {
+                // Both the Snapshot name and UUID are set to _latest_
+                SnapshotId snapshotId = new SnapshotId(LATEST, LATEST);
+                copiedSnapshotIds.put(indexName, snapshotId);
+                snapshotStates.put(indexName, SnapshotState.SUCCESS);
+                Index index = remoteIndices.get(indexName).getIndex();
+                indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singleton(snapshotId));
+            }
+            return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots, ShardGenerations.EMPTY);
+        });
     }
 
     @Override

+ 2 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

@@ -61,6 +61,7 @@ import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.ShardGenerations;
+import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
 import org.elasticsearch.repositories.fs.FsRepository;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.hamcrest.Matchers;
@@ -211,7 +212,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
                 repository.finalizeSnapshot(snapshotId,
                     ShardGenerations.builder().put(indexId, 0, indexShardSnapshotStatus.generation()).build(),
                     indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(),
-                    repository.getRepositoryData().getGenId(), true,
+                    ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(), true,
                     MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(),
                     true,
                     finFuture);