Browse Source

Refactor RestoreService Restore Path (#73258)

Make the restore path a little easier to follow by splitting it up into
the cluster state update and the steps that happen before the CS update.
Also, document more pieces of it and remove some confusing redundant code.
Armin Braun 4 years ago
parent
commit
b13f43b24e
1 changed files with 608 additions and 484 deletions
  1. 608 484
      server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

+ 608 - 484
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -50,7 +50,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.CheckedConsumer;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -89,7 +89,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -235,485 +234,34 @@ public class RestoreService implements ClusterStateApplier {
             final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
             repository.getRepositoryData(repositoryDataListener);
 
-            final CheckedConsumer<RepositoryData, IOException> onRepositoryDataReceived = repositoryData -> {
-                final String snapshotName = request.snapshot();
-                final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
-                    .filter(s -> snapshotName.equals(s.getName())).findFirst();
-                if (matchingSnapshotId.isPresent() == false) {
-                    throw new SnapshotRestoreException(repositoryName, snapshotName, "snapshot does not exist");
-                }
-
-                final SnapshotId snapshotId = matchingSnapshotId.get();
-                if (request.snapshotUuid() != null && request.snapshotUuid().equals(snapshotId.getUUID()) == false) {
-                    throw new SnapshotRestoreException(repositoryName, snapshotName,
-                        "snapshot UUID mismatch: expected [" + request.snapshotUuid() + "] but got [" + snapshotId.getUUID() + "]");
-                }
-
-                final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
-                final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
-
-                // Make sure that we can restore from this snapshot
-                validateSnapshotRestorable(repositoryName, snapshotInfo);
-
-                // Get the global state if necessary
-                Metadata globalMetadata = null;
-                final Metadata.Builder metadataBuilder;
-                if (request.includeGlobalState()) {
-                    globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId);
-                    metadataBuilder = Metadata.builder(globalMetadata);
-                } else {
-                    metadataBuilder = Metadata.builder();
-                }
-
-                List<String> requestIndices = new ArrayList<>(Arrays.asList(request.indices()));
-
-                // Get data stream metadata for requested data streams
-                Tuple<Map<String, DataStream>, Map<String, DataStreamAlias>> result =
-                    getDataStreamsToRestore(repository, snapshotId, snapshotInfo, globalMetadata, requestIndices);
-                Map<String, DataStream> dataStreamsToRestore = result.v1();
-                Map<String, DataStreamAlias> dataStreamAliasesToRestore = result.v2();
-
-
-                // Remove the data streams from the list of requested indices
-                requestIndices.removeAll(dataStreamsToRestore.keySet());
-
-                // And add the backing indices
-                Set<String> dataStreamIndices = dataStreamsToRestore.values().stream()
-                    .flatMap(ds -> ds.getIndices().stream())
-                    .map(Index::getName)
-                    .collect(Collectors.toSet());
-                requestIndices.addAll(dataStreamIndices);
-
-                // Determine system indices to restore from requested feature states
-                final Map<String, List<String>> featureStatesToRestore = getFeatureStatesToRestore(request, snapshotInfo, snapshot);
-                final Set<String> featureStateIndices = featureStatesToRestore.values().stream()
-                    .flatMap(Collection::stream)
-                    .collect(Collectors.toSet());
-
-                // Resolve the indices that were directly requested
-                final List<String> requestedIndicesInSnapshot = filterIndices(snapshotInfo.indices(), requestIndices.toArray(String[]::new),
-                    request.indicesOptions());
-
-                // Combine into the final list of indices to be restored
-                final List<String> requestedIndicesIncludingSystem = Stream.concat(
-                    requestedIndicesInSnapshot.stream(),
-                    featureStateIndices.stream()
-                ).distinct().collect(Collectors.toList());
-
-                final Set<String> explicitlyRequestedSystemIndices = new HashSet<>();
-                for (IndexId indexId : repositoryData.resolveIndices(requestedIndicesIncludingSystem).values()) {
-                    IndexMetadata snapshotIndexMetaData = repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId);
-                    if (snapshotIndexMetaData.isSystem()) {
-                        if (requestedIndicesInSnapshot.contains(indexId.getName())) {
-                            explicitlyRequestedSystemIndices.add(indexId.getName());
-                        }
-                    }
-                    metadataBuilder.put(snapshotIndexMetaData, false);
-                }
-
-                // log a deprecation warning if the any of the indexes to delete were included in the request and the snapshot
-                // is from a version that should have feature states
-                if (snapshotInfo.version().onOrAfter(Version.V_7_12_0) && explicitlyRequestedSystemIndices.isEmpty() == false) {
-                    deprecationLogger.deprecate(DeprecationCategory.API, "restore-system-index-from-snapshot",
-                        "Restoring system indices by name is deprecated. Use feature states instead. System indices: "
-                            + explicitlyRequestedSystemIndices);
-                }
-
-                final Metadata metadata = metadataBuilder
-                    .dataStreams(dataStreamsToRestore, dataStreamAliasesToRestore)
-                    .build();
-
-                // Apply renaming on index names, returning a map of names where
-                // the key is the renamed index and the value is the original name
-                final Map<String, String> indices = renamedIndices(request, requestedIndicesIncludingSystem, dataStreamIndices,
-                    featureStateIndices);
-
-                // Now we can start the actual restore process by adding shards to be recovered in the cluster state
-                // and updating cluster metadata (global and index) as needed
-                clusterService.submitStateUpdateTask(
-                        "restore_snapshot[" + snapshotName + ']', new ClusterStateUpdateTask(request.masterNodeTimeout()) {
-                    final String restoreUUID = UUIDs.randomBase64UUID();
-                    RestoreInfo restoreInfo = null;
-
-                    @Override
-                    public ClusterState execute(ClusterState currentState) {
-                        // Check if the snapshot to restore is currently being deleted
-                        SnapshotDeletionsInProgress deletionsInProgress =
-                            currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
-                        if (deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(snapshotId))) {
-                            throw new ConcurrentSnapshotExecutionException(snapshot,
-                                "cannot restore a snapshot while a snapshot deletion is in-progress [" +
-                                    deletionsInProgress.getEntries().get(0) + "]");
-                        }
-
-                        // Clear out all existing indices which fall within a system index pattern being restored
-                        final Set<Index> systemIndicesToDelete = resolveSystemIndicesToDelete(
-                            currentState,
-                            featureStatesToRestore.keySet()
-                        );
-                        currentState = metadataDeleteIndexService.deleteIndices(currentState, systemIndicesToDelete);
-
-                        // Updating cluster state
-                        ClusterState.Builder builder = ClusterState.builder(currentState);
-                        Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
-                        ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
-                        RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
-                        ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards;
-                        Set<String> aliases = new HashSet<>();
-
-                        if (indices.isEmpty() == false) {
-                            // We have some indices to restore
-                            ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shardsBuilder =
-                                ImmutableOpenMap.builder();
-                            final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
-                                .minimumIndexCompatibilityVersion();
-                            for (Map.Entry<String, String> indexEntry : indices.entrySet()) {
-                                String index = indexEntry.getValue();
-                                boolean partial = checkPartial(index);
-                                SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(restoreUUID, snapshot,
-                                    snapshotInfo.version(), repositoryData.resolveIndexId(index));
-                                String renamedIndexName = indexEntry.getKey();
-                                IndexMetadata snapshotIndexMetadata = metadata.index(index);
-                                snapshotIndexMetadata = updateIndexSettings(snapshotIndexMetadata,
-                                    request.indexSettings(), request.ignoreIndexSettings());
-                                try {
-                                    snapshotIndexMetadata = indexMetadataVerifier.verifyIndexMetadata(snapshotIndexMetadata,
-                                        minIndexCompatibilityVersion);
-                                } catch (Exception ex) {
-                                    throw new SnapshotRestoreException(snapshot, "cannot restore index [" + index +
-                                        "] because it cannot be upgraded", ex);
-                                }
-                                // Check that the index is closed or doesn't exist
-                                IndexMetadata currentIndexMetadata = currentState.metadata().index(renamedIndexName);
-                                IntSet ignoreShards = new IntHashSet();
-                                final Index renamedIndex;
-                                if (currentIndexMetadata == null) {
-                                    // Index doesn't exist - create it and start recovery
-                                    // Make sure that the index we are about to create has a validate name
-                                    boolean isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(snapshotIndexMetadata.getSettings());
-                                    createIndexService.validateIndexName(renamedIndexName, currentState);
-                                    createIndexService.validateDotIndex(renamedIndexName, isHidden);
-                                    createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetadata.getSettings(), false);
-                                    IndexMetadata.Builder indexMdBuilder = IndexMetadata.builder(snapshotIndexMetadata)
-                                        .state(IndexMetadata.State.OPEN)
-                                        .index(renamedIndexName);
-                                    indexMdBuilder.settings(Settings.builder()
-                                        .put(snapshotIndexMetadata.getSettings())
-                                        .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()))
-                                        .timestampRange(IndexLongFieldRange.NO_SHARDS);
-                                    shardLimitValidator.validateShardLimit(snapshotIndexMetadata.getSettings(), currentState);
-                                    if (request.includeAliases() == false && snapshotIndexMetadata.getAliases().isEmpty() == false
-                                            && isSystemIndex(snapshotIndexMetadata) == false) {
-                                        // Remove all aliases - they shouldn't be restored
-                                        indexMdBuilder.removeAllAliases();
-                                    } else {
-                                        for (ObjectCursor<String> alias : snapshotIndexMetadata.getAliases().keys()) {
-                                            aliases.add(alias.value);
-                                        }
-                                    }
-                                    IndexMetadata updatedIndexMetadata = indexMdBuilder.build();
-                                    if (partial) {
-                                        populateIgnoredShards(index, ignoreShards);
-                                    }
-                                    rtBuilder.addAsNewRestore(updatedIndexMetadata, recoverySource, ignoreShards);
-                                    blocks.addBlocks(updatedIndexMetadata);
-                                    mdBuilder.put(updatedIndexMetadata, true);
-                                    renamedIndex = updatedIndexMetadata.getIndex();
-                                } else {
-                                    validateExistingIndex(currentIndexMetadata, snapshotIndexMetadata, renamedIndexName, partial);
-                                    // Index exists and it's closed - open it in metadata and start recovery
-                                    IndexMetadata.Builder indexMdBuilder =
-                                        IndexMetadata.builder(snapshotIndexMetadata).state(IndexMetadata.State.OPEN);
-                                    indexMdBuilder.version(
-                                        Math.max(snapshotIndexMetadata.getVersion(), 1 + currentIndexMetadata.getVersion()));
-                                    indexMdBuilder.mappingVersion(
-                                        Math.max(snapshotIndexMetadata.getMappingVersion(), 1 + currentIndexMetadata.getMappingVersion()));
-                                    indexMdBuilder.settingsVersion(
-                                        Math.max(
-                                            snapshotIndexMetadata.getSettingsVersion(),
-                                            1 + currentIndexMetadata.getSettingsVersion()));
-                                    indexMdBuilder.aliasesVersion(
-                                        Math.max(snapshotIndexMetadata.getAliasesVersion(), 1 + currentIndexMetadata.getAliasesVersion()));
-                                    indexMdBuilder.timestampRange(IndexLongFieldRange.NO_SHARDS);
-
-                                    for (int shard = 0; shard < snapshotIndexMetadata.getNumberOfShards(); shard++) {
-                                        indexMdBuilder.primaryTerm(shard,
-                                            Math.max(snapshotIndexMetadata.primaryTerm(shard), currentIndexMetadata.primaryTerm(shard)));
-                                    }
-
-                                    if (request.includeAliases() == false && isSystemIndex(snapshotIndexMetadata) == false) {
-                                        // Remove all snapshot aliases
-                                        if (snapshotIndexMetadata.getAliases().isEmpty() == false) {
-                                            indexMdBuilder.removeAllAliases();
-                                        }
-                                        /// Add existing aliases
-                                        for (ObjectCursor<AliasMetadata> alias : currentIndexMetadata.getAliases().values()) {
-                                            indexMdBuilder.putAlias(alias.value);
-                                        }
-                                    } else {
-                                        for (ObjectCursor<String> alias : snapshotIndexMetadata.getAliases().keys()) {
-                                            aliases.add(alias.value);
-                                        }
-                                    }
-                                    indexMdBuilder.settings(Settings.builder()
-                                        .put(snapshotIndexMetadata.getSettings())
-                                        .put(IndexMetadata.SETTING_INDEX_UUID, currentIndexMetadata.getIndexUUID())
-                                        .put(IndexMetadata.SETTING_HISTORY_UUID, UUIDs.randomBase64UUID()));
-                                    IndexMetadata updatedIndexMetadata = indexMdBuilder.index(renamedIndexName).build();
-                                    rtBuilder.addAsRestore(updatedIndexMetadata, recoverySource);
-                                    blocks.updateBlocks(updatedIndexMetadata);
-                                    mdBuilder.put(updatedIndexMetadata, true);
-                                    renamedIndex = updatedIndexMetadata.getIndex();
-                                }
-
-                                for (int shard = 0; shard < snapshotIndexMetadata.getNumberOfShards(); shard++) {
-                                    if (ignoreShards.contains(shard) == false) {
-                                        shardsBuilder.put(new ShardId(renamedIndex, shard),
-                                            new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId()));
-                                    } else {
-                                        shardsBuilder.put(new ShardId(renamedIndex, shard),
-                                            new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(),
-                                                RestoreInProgress.State.FAILURE));
-                                    }
+            repositoryDataListener.whenComplete(repositoryData ->
+                repositoryUuidRefreshListener.whenComplete(ignored ->
+                    // fork handling to the generic pool since it loads various pieces of metadata from the repository over a longer period
+                    // of time
+                    clusterService.getClusterApplierService().threadPool().generic().execute(
+                        ActionRunnable.wrap(
+                            listener,
+                            l -> {
+                                final String snapshotName = request.snapshot();
+                                final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
+                                        .filter(s -> snapshotName.equals(s.getName())).findFirst();
+                                if (matchingSnapshotId.isPresent() == false) {
+                                    throw new SnapshotRestoreException(repositoryName, snapshotName, "snapshot does not exist");
                                 }
-                            }
 
-                            shards = shardsBuilder.build();
-                            RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(
-                                restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards),
-                                List.copyOf(indices.keySet()),
-                                shards
-                            );
-                            builder.putCustom(RestoreInProgress.TYPE, new RestoreInProgress.Builder(
-                                currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).add(restoreEntry).build());
-                        } else {
-                            shards = ImmutableOpenMap.of();
-                        }
-
-                        checkAliasNameConflicts(indices, aliases);
-
-                        Map<String, DataStream> updatedDataStreams = new HashMap<>(currentState.metadata().dataStreams());
-                        updatedDataStreams.putAll(dataStreamsToRestore.values().stream()
-                            .map(ds -> updateDataStream(ds, mdBuilder, request))
-                            .collect(Collectors.toMap(DataStream::getName, Function.identity())));
-                        Map<String, DataStreamAlias> updatedDataStreamAliases = new HashMap<>(currentState.metadata().dataStreamAliases());
-                        metadata.dataStreamAliases().values().stream()
-                            // Optionally rename the data stream names for each alias
-                            .map(alias -> {
-                                if (request.renamePattern() != null && request.renameReplacement() != null) {
-                                    List<String> renamedDataStreams = alias.getDataStreams().stream()
-                                        .map(s -> s.replaceAll(request.renamePattern(), request.renameReplacement()))
-                                        .collect(Collectors.toList());
-                                    return new DataStreamAlias(alias.getName(), renamedDataStreams);
-                                } else {
-                                    return alias;
-                                }
-                            }).forEach(alias -> {
-                                DataStreamAlias current = updatedDataStreamAliases.putIfAbsent(alias.getName(), alias);
-                                if (current != null) {
-                                    // Merge data stream alias from snapshot with an existing data stream aliases in target cluster:
-                                    Set<String> mergedDataStreams = new HashSet<>(current.getDataStreams());
-                                    mergedDataStreams.addAll(alias.getDataStreams());
-                                    DataStreamAlias newInstance = new DataStreamAlias(alias.getName(), List.copyOf(mergedDataStreams));
-                                    updatedDataStreamAliases.put(alias.getName(), newInstance);
-                                }
-                            });
-                        mdBuilder.dataStreams(updatedDataStreams, updatedDataStreamAliases);
-
-                        // Restore global state if needed
-                        if (request.includeGlobalState()) {
-                            if (metadata.persistentSettings() != null) {
-                                Settings settings = metadata.persistentSettings();
-                                if (request.skipOperatorOnlyState()) {
-                                    // Skip any operator-only settings from the snapshot. This happens when operator privileges are enabled
-                                    Set<String> operatorSettingKeys = Stream.concat(
-                                        settings.keySet().stream(), currentState.metadata().persistentSettings().keySet().stream())
-                                        .filter(k -> {
-                                            final Setting<?> setting = clusterSettings.get(k);
-                                            return setting != null && setting.isOperatorOnly();
-                                        })
-                                        .collect(Collectors.toSet());
-                                    if (false == operatorSettingKeys.isEmpty()) {
-                                        settings = Settings.builder()
-                                            .put(settings.filter(k -> false == operatorSettingKeys.contains(k)))
-                                            .put(currentState.metadata().persistentSettings().filter(operatorSettingKeys::contains))
-                                            .build();
-                                    }
+                                final SnapshotId snapshotId = matchingSnapshotId.get();
+                                if (request.snapshotUuid() != null && request.snapshotUuid().equals(snapshotId.getUUID()) == false) {
+                                    throw new SnapshotRestoreException(repositoryName, snapshotName,
+                                            "snapshot UUID mismatch: expected [" + request.snapshotUuid() + "] but got ["
+                                                    + snapshotId.getUUID() + "]");
                                 }
-                                clusterSettings.validateUpdate(settings);
-                                mdBuilder.persistentSettings(settings);
-                            }
-                            if (metadata.templates() != null) {
-                                // TODO: Should all existing templates be deleted first?
-                                for (ObjectCursor<IndexTemplateMetadata> cursor : metadata.templates().values()) {
-                                    mdBuilder.put(cursor.value);
-                                }
-                            }
-                            if (metadata.customs() != null) {
-                                for (ObjectObjectCursor<String, Metadata.Custom> cursor : metadata.customs()) {
-                                    if (RepositoriesMetadata.TYPE.equals(cursor.key) == false
-                                            && DataStreamMetadata.TYPE.equals(cursor.key) == false
-                                            && cursor.value instanceof Metadata.NonRestorableCustom == false) {
-                                        // TODO: Check request.skipOperatorOnly for Autoscaling policies (NonRestorableCustom)
-                                        // Don't restore repositories while we are working with them
-                                        // TODO: Should we restore them at the end?
-                                        // Also, don't restore data streams here, we already added them to the metadata builder above
-                                        mdBuilder.putCustom(cursor.key, cursor.value);
-                                    }
-                                }
-                            }
-                        }
-
-                        if (completed(shards)) {
-                            // We don't have any indices to restore - we are done
-                            restoreInfo = new RestoreInfo(snapshotId.getName(),
-                                List.copyOf(indices.keySet()),
-                                shards.size(),
-                                shards.size() - failedShards(shards));
-                        }
-
-                        RoutingTable rt = rtBuilder.build();
-                        updater.accept(currentState, mdBuilder);
-                        ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build();
-                        return allocationService.reroute(updatedState, "restored snapshot [" + snapshot + "]");
-                    }
-
-                    private void checkAliasNameConflicts(Map<String, String> renamedIndices, Set<String> aliases) {
-                        for (Map.Entry<String, String> renamedIndex : renamedIndices.entrySet()) {
-                            if (aliases.contains(renamedIndex.getKey())) {
-                                throw new SnapshotRestoreException(snapshot,
-                                    "cannot rename index [" + renamedIndex.getValue() + "] into [" + renamedIndex.getKey()
-                                        + "] because of conflict with an alias with the same name");
-                            }
-                        }
-                    }
-
-                    private void populateIgnoredShards(String index, IntSet ignoreShards) {
-                        for (SnapshotShardFailure failure : snapshotInfo.shardFailures()) {
-                            if (index.equals(failure.index())) {
-                                ignoreShards.add(failure.shardId());
-                            }
-                        }
-                    }
-
-                    private boolean checkPartial(String index) {
-                        // Make sure that index was fully snapshotted
-                        if (failed(snapshotInfo, index)) {
-                            if (request.partial()) {
-                                return true;
-                            } else {
-                                throw new SnapshotRestoreException(snapshot, "index [" + index + "] wasn't fully snapshotted - cannot " +
-                                    "restore");
-                            }
-                        } else {
-                            return false;
-                        }
-                    }
-
-                    private void validateExistingIndex(IndexMetadata currentIndexMetadata, IndexMetadata snapshotIndexMetadata,
-                                                       String renamedIndex, boolean partial) {
-                        // Index exist - checking that it's closed
-                        if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) {
-                            // TODO: Enable restore for open indices
-                            throw new SnapshotRestoreException(snapshot, "cannot restore index [" + renamedIndex
-                                + "] because an open index " +
-                                "with same name already exists in the cluster. Either close or delete the existing index or restore the " +
-                                "index under a different name by providing a rename pattern and replacement name");
-                        }
-                        // Index exist - checking if it's partial restore
-                        if (partial) {
-                            throw new SnapshotRestoreException(snapshot, "cannot restore partial index [" + renamedIndex
-                                + "] because such index already exists");
-                        }
-                        // Make sure that the number of shards is the same. That's the only thing that we cannot change
-                        if (currentIndexMetadata.getNumberOfShards() != snapshotIndexMetadata.getNumberOfShards()) {
-                            throw new SnapshotRestoreException(snapshot,
-                                "cannot restore index [" + renamedIndex + "] with [" + currentIndexMetadata.getNumberOfShards()
-                                    + "] shards from a snapshot of index [" + snapshotIndexMetadata.getIndex().getName() + "] with [" +
-                                    snapshotIndexMetadata.getNumberOfShards() + "] shards");
-                        }
-                    }
-
-                    /**
-                     * Optionally updates index settings in indexMetadata by removing settings listed in ignoreSettings and
-                     * merging them with settings in changeSettings.
-                     */
-                    private IndexMetadata updateIndexSettings(IndexMetadata indexMetadata, Settings changeSettings,
-                                                              String[] ignoreSettings) {
-                        Settings normalizedChangeSettings = Settings.builder()
-                            .put(changeSettings)
-                            .normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX)
-                            .build();
-                        if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(indexMetadata.getSettings()) &&
-                            IndexSettings.INDEX_SOFT_DELETES_SETTING.exists(changeSettings) &&
-                            IndexSettings.INDEX_SOFT_DELETES_SETTING.get(changeSettings) == false) {
-                            throw new SnapshotRestoreException(snapshot,
-                                "cannot disable setting [" + IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey() + "] on restore");
-                        }
-                        IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata);
-                        Settings settings = indexMetadata.getSettings();
-                        Set<String> keyFilters = new HashSet<>();
-                        List<String> simpleMatchPatterns = new ArrayList<>();
-                        for (String ignoredSetting : ignoreSettings) {
-                            if (Regex.isSimpleMatchPattern(ignoredSetting) == false) {
-                                if (UNREMOVABLE_SETTINGS.contains(ignoredSetting)) {
-                                    throw new SnapshotRestoreException(
-                                        snapshot, "cannot remove setting [" + ignoredSetting + "] on restore");
-                                } else {
-                                    keyFilters.add(ignoredSetting);
-                                }
-                            } else {
-                                simpleMatchPatterns.add(ignoredSetting);
-                            }
-                        }
-                        Predicate<String> settingsFilter = k -> {
-                            if (UNREMOVABLE_SETTINGS.contains(k) == false) {
-                                for (String filterKey : keyFilters) {
-                                    if (k.equals(filterKey)) {
-                                        return false;
-                                    }
-                                }
-                                for (String pattern : simpleMatchPatterns) {
-                                    if (Regex.simpleMatch(pattern, k)) {
-                                        return false;
-                                    }
-                                }
-                            }
-                            return true;
-                        };
-                        Settings.Builder settingsBuilder = Settings.builder()
-                            .put(settings.filter(settingsFilter))
-                            .put(normalizedChangeSettings.filter(k -> {
-                                if (UNMODIFIABLE_SETTINGS.contains(k)) {
-                                    throw new SnapshotRestoreException(snapshot, "cannot modify setting [" + k + "] on restore");
-                                } else {
-                                    return true;
-                                }
-                            }));
-                        settingsBuilder.remove(MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey());
-                        return builder.settings(settingsBuilder).build();
-                    }
-
-                    @Override
-                    public void onFailure(String source, Exception e) {
-                        logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e);
-                        listener.onFailure(e);
-                    }
-
-                    @Override
-                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                        listener.onResponse(new RestoreCompletionResponse(restoreUUID, snapshot, restoreInfo));
-                    }
-                });
-            };
-
-            // fork handling the above consumer to the generic pool since it loads various pieces of metadata from the repository over a
-            // longer period of time
-            repositoryDataListener.whenComplete(repositoryData -> repositoryUuidRefreshListener.whenComplete(ignored ->
-                    clusterService.getClusterApplierService().threadPool().generic().execute(
-                            ActionRunnable.wrap(listener, l -> onRepositoryDataReceived.accept(repositoryData))
-                    ), listener::onFailure), listener::onFailure);
-
+                                startRestore(repository.getSnapshotInfo(snapshotId), repository, request, repositoryData, updater, l);
+                            })
+                        ),
+                    listener::onFailure
+                ),
+                listener::onFailure
+            );
         } catch (Exception e) {
             logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot",
                 request.repository() + ":" + request.snapshot()), e);
@@ -721,6 +269,116 @@ public class RestoreService implements ClusterStateApplier {
         }
     }
 
+    /**
+     * Start the snapshot restore process. First validate that the snapshot can be restored based on the contents of the repository and
+     * the restore request. If it can be restored, compute the metadata to be restored for the current restore request and submit the
+     * cluster state update request to start the restore.
+     *
+     * @param snapshotInfo   snapshot info for the snapshot to restore
+     * @param repository     the repository to restore from
+     * @param request        restore request
+     * @param repositoryData current repository data for the repository to restore from
+     * @param updater        handler that allows callers to make modifications to {@link Metadata} in the same cluster state update as the
+     *                       restore operation
+     * @param listener       listener to resolve once restore has been started
+     * @throws IOException   on failure to load metadata from the repository
+     */
+    private void startRestore(SnapshotInfo snapshotInfo,
+                              Repository repository,
+                              RestoreSnapshotRequest request,
+                              RepositoryData repositoryData,
+                              BiConsumer<ClusterState, Metadata.Builder> updater,
+                              ActionListener<RestoreCompletionResponse> listener) throws IOException {
+        final SnapshotId snapshotId = snapshotInfo.snapshotId();
+        final String repositoryName = repository.getMetadata().name();
+        final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
+
+        // Make sure that we can restore from this snapshot
+        validateSnapshotRestorable(repositoryName, snapshotInfo);
+
+        // Get the global state if necessary
+        Metadata globalMetadata = null;
+        final Metadata.Builder metadataBuilder;
+        if (request.includeGlobalState()) {
+            globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId);
+            metadataBuilder = Metadata.builder(globalMetadata);
+        } else {
+            metadataBuilder = Metadata.builder();
+        }
+
+        List<String> requestIndices = new ArrayList<>(Arrays.asList(request.indices()));
+
+        // Get data stream metadata for requested data streams
+        Tuple<Map<String, DataStream>, Map<String, DataStreamAlias>> result =
+                getDataStreamsToRestore(repository, snapshotId, snapshotInfo, globalMetadata, requestIndices);
+        Map<String, DataStream> dataStreamsToRestore = result.v1();
+        Map<String, DataStreamAlias> dataStreamAliasesToRestore = result.v2();
+
+        // Remove the data streams from the list of requested indices
+        requestIndices.removeAll(dataStreamsToRestore.keySet());
+
+        // And add the backing indices
+        Set<String> dataStreamIndices = dataStreamsToRestore.values().stream()
+                .flatMap(ds -> ds.getIndices().stream())
+                .map(Index::getName)
+                .collect(Collectors.toSet());
+        requestIndices.addAll(dataStreamIndices);
+
+        // Determine system indices to restore from requested feature states
+        final Map<String, List<String>> featureStatesToRestore = getFeatureStatesToRestore(request, snapshotInfo, snapshot);
+        final Set<String> featureStateIndices = featureStatesToRestore.values().stream()
+                .flatMap(Collection::stream)
+                .collect(Collectors.toSet());
+
+        // Resolve the indices that were directly requested
+        final List<String> requestedIndicesInSnapshot = filterIndices(snapshotInfo.indices(), requestIndices.toArray(String[]::new),
+                request.indicesOptions());
+
+        // Combine into the final list of indices to be restored
+        final List<String> requestedIndicesIncludingSystem = Stream.concat(
+                requestedIndicesInSnapshot.stream(),
+                featureStateIndices.stream()
+        ).distinct().collect(Collectors.toList());
+
+        final Set<String> explicitlyRequestedSystemIndices = new HashSet<>();
+        for (IndexId indexId : repositoryData.resolveIndices(requestedIndicesIncludingSystem).values()) {
+            IndexMetadata snapshotIndexMetaData = repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId);
+            if (snapshotIndexMetaData.isSystem()) {
+                if (requestedIndicesInSnapshot.contains(indexId.getName())) {
+                    explicitlyRequestedSystemIndices.add(indexId.getName());
+                }
+            }
+            metadataBuilder.put(snapshotIndexMetaData, false);
+        }
+
+        // log a deprecation warning if the any of the indexes to delete were included in the request and the snapshot
+        // is from a version that should have feature states
+        if (snapshotInfo.version().onOrAfter(Version.V_7_12_0) && explicitlyRequestedSystemIndices.isEmpty() == false) {
+            deprecationLogger.deprecate(DeprecationCategory.API, "restore-system-index-from-snapshot",
+                    "Restoring system indices by name is deprecated. Use feature states instead. System indices: "
+                            + explicitlyRequestedSystemIndices);
+        }
+
+        // Now we can start the actual restore process by adding shards to be recovered in the cluster state
+        // and updating cluster metadata (global and index) as needed
+        clusterService.submitStateUpdateTask(
+                "restore_snapshot[" + snapshotId.getName() + ']',
+                new RestoreSnapshotStateTask(
+                        request,
+                        snapshot,
+                        featureStatesToRestore.keySet(),
+                        // Apply renaming on index names, returning a map of names where
+                        // the key is the renamed index and the value is the original name
+                        renamedIndices(request, requestedIndicesIncludingSystem, dataStreamIndices, featureStateIndices, repositoryData),
+                        snapshotInfo,
+                        metadataBuilder.dataStreams(dataStreamsToRestore, dataStreamAliasesToRestore).build(),
+                        dataStreamsToRestore.values(),
+                        updater,
+                        listener
+                )
+        );
+    }
+
     private void setRefreshRepositoryUuidOnRestore(boolean refreshRepositoryUuidOnRestore) {
         this.refreshRepositoryUuidOnRestore = refreshRepositoryUuidOnRestore;
     }
@@ -1095,9 +753,12 @@ public class RestoreService implements ClusterStateApplier {
         return failedShards;
     }
 
-    private static Map<String, String> renamedIndices(RestoreSnapshotRequest request, List<String> filteredIndices,
-                                                      Set<String> dataStreamIndices, Set<String> featureIndices) {
-        Map<String, String> renamedIndices = new HashMap<>();
+    private static Map<String, IndexId> renamedIndices(RestoreSnapshotRequest request,
+                                                       List<String> filteredIndices,
+                                                       Set<String> dataStreamIndices,
+                                                       Set<String> featureIndices,
+                                                       RepositoryData repositoryData) {
+        Map<String, IndexId> renamedIndices = new HashMap<>();
         for (String index : filteredIndices) {
             String renamedIndex;
             if (featureIndices.contains(index)) {
@@ -1106,10 +767,12 @@ public class RestoreService implements ClusterStateApplier {
             } else {
                 renamedIndex = renameIndex(index, request, dataStreamIndices.contains(index));
             }
-            String previousIndex = renamedIndices.put(renamedIndex, index);
+            IndexId previousIndex = renamedIndices.put(renamedIndex, repositoryData.resolveIndexId(index));
             if (previousIndex != null) {
-                throw new SnapshotRestoreException(request.repository(), request.snapshot(),
-                        "indices [" + index + "] and [" + previousIndex + "] are renamed into the same index [" + renamedIndex + "]");
+                throw new SnapshotRestoreException(
+                    request.repository(), request.snapshot(),
+                    "indices [" + index + "] and [" + previousIndex.getName() + "] are renamed into the same index [" + renamedIndex + "]"
+                );
             }
         }
         return Collections.unmodifiableMap(renamedIndices);
@@ -1247,4 +910,465 @@ public class RestoreService implements ClusterStateApplier {
             logger.warn("Failed to update restore state ", t);
         }
     }
+
+    /**
+     * Optionally updates index settings in indexMetadata by removing settings listed in ignoreSettings and
+     * merging them with settings in changeSettings.
+     */
+    private static IndexMetadata updateIndexSettings(Snapshot snapshot,
+                                                     IndexMetadata indexMetadata,
+                                                     Settings changeSettings,
+                                                     String[] ignoreSettings) {
+        Settings normalizedChangeSettings = Settings.builder()
+                .put(changeSettings)
+                .normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX)
+                .build();
+        if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(indexMetadata.getSettings()) &&
+                IndexSettings.INDEX_SOFT_DELETES_SETTING.exists(changeSettings) &&
+                IndexSettings.INDEX_SOFT_DELETES_SETTING.get(changeSettings) == false) {
+            throw new SnapshotRestoreException(snapshot,
+                    "cannot disable setting [" + IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey() + "] on restore");
+        }
+        IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata);
+        Settings settings = indexMetadata.getSettings();
+        Set<String> keyFilters = new HashSet<>();
+        List<String> simpleMatchPatterns = new ArrayList<>();
+        for (String ignoredSetting : ignoreSettings) {
+            if (Regex.isSimpleMatchPattern(ignoredSetting) == false) {
+                if (UNREMOVABLE_SETTINGS.contains(ignoredSetting)) {
+                    throw new SnapshotRestoreException(
+                            snapshot, "cannot remove setting [" + ignoredSetting + "] on restore");
+                } else {
+                    keyFilters.add(ignoredSetting);
+                }
+            } else {
+                simpleMatchPatterns.add(ignoredSetting);
+            }
+        }
+        Settings.Builder settingsBuilder = Settings.builder()
+                .put(settings.filter(k -> {
+                    if (UNREMOVABLE_SETTINGS.contains(k) == false) {
+                        for (String filterKey : keyFilters) {
+                            if (k.equals(filterKey)) {
+                                return false;
+                            }
+                        }
+                        for (String pattern : simpleMatchPatterns) {
+                            if (Regex.simpleMatch(pattern, k)) {
+                                return false;
+                            }
+                        }
+                    }
+                    return true;
+                }))
+                .put(normalizedChangeSettings.filter(k -> {
+                    if (UNMODIFIABLE_SETTINGS.contains(k)) {
+                        throw new SnapshotRestoreException(snapshot, "cannot modify setting [" + k + "] on restore");
+                    } else {
+                        return true;
+                    }
+                }));
+        settingsBuilder.remove(MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey());
+        return builder.settings(settingsBuilder).build();
+    }
+
+    /**
+     * Cluster state update task that is executed to start a restore operation.
+     */
+    private final class RestoreSnapshotStateTask extends ClusterStateUpdateTask {
+
+        /**
+         * UUID to use for this restore, as returned by {@link RestoreInProgress.Entry#uuid()}.
+         */
+        private final String restoreUUID = UUIDs.randomBase64UUID();
+
+        /**
+         * The restore request that triggered this restore task.
+         */
+        private final RestoreSnapshotRequest request;
+
+        /**
+         * Feature states to restore.
+         */
+        private final Set<String> featureStatesToRestore;
+
+        /**
+         * Map of index names to restore to the repository index id to restore them from.
+         */
+        private final Map<String, IndexId> indicesToRestore;
+
+        private final Snapshot snapshot;
+
+        /**
+         * Snapshot info of the snapshot to restore
+         */
+        private final SnapshotInfo snapshotInfo;
+
+        /**
+         * Metadata loaded from the snapshot
+         */
+        private final Metadata metadata;
+
+        private final Collection<DataStream> dataStreamsToRestore;
+
+        private final BiConsumer<ClusterState, Metadata.Builder> updater;
+
+        private final ActionListener<RestoreCompletionResponse> listener;
+
+        @Nullable
+        private RestoreInfo restoreInfo;
+
+        RestoreSnapshotStateTask(RestoreSnapshotRequest request,
+                                 Snapshot snapshot,
+                                 Set<String> featureStatesToRestore,
+                                 Map<String, IndexId> indicesToRestore,
+                                 SnapshotInfo snapshotInfo,
+                                 Metadata metadata,
+                                 Collection<DataStream> dataStreamsToRestore,
+                                 BiConsumer<ClusterState, Metadata.Builder> updater,
+                                 ActionListener<RestoreCompletionResponse> listener) {
+            super(request.masterNodeTimeout());
+            this.request = request;
+            this.snapshot = snapshot;
+            this.featureStatesToRestore = featureStatesToRestore;
+            this.indicesToRestore = indicesToRestore;
+            this.snapshotInfo = snapshotInfo;
+            this.metadata = metadata;
+            this.dataStreamsToRestore = dataStreamsToRestore;
+            this.updater = updater;
+            this.listener = listener;
+        }
+
+        @Override
+        public ClusterState execute(ClusterState currentState) {
+            // Check if the snapshot to restore is currently being deleted
+            ensureSnapshotNotDeleted(currentState);
+
+            // Clear out all existing indices which fall within a system index pattern being restored
+            currentState = metadataDeleteIndexService.deleteIndices(
+                    currentState,
+                    resolveSystemIndicesToDelete(currentState, featureStatesToRestore)
+            );
+
+            // Updating cluster state
+            final Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
+            final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
+            final RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
+
+            final ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder();
+
+            final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion().minimumIndexCompatibilityVersion();
+            final String localNodeId = clusterService.state().nodes().getLocalNodeId();
+            for (Map.Entry<String, IndexId> indexEntry : indicesToRestore.entrySet()) {
+                final IndexId index = indexEntry.getValue();
+                IndexMetadata snapshotIndexMetadata = updateIndexSettings(
+                        snapshot,
+                        metadata.index(index.getName()),
+                        request.indexSettings(),
+                        request.ignoreIndexSettings()
+                );
+                try {
+                    snapshotIndexMetadata = indexMetadataVerifier.verifyIndexMetadata(snapshotIndexMetadata, minIndexCompatibilityVersion);
+                } catch (Exception ex) {
+                    throw new SnapshotRestoreException(snapshot, "cannot restore index [" + index +
+                            "] because it cannot be upgraded", ex);
+                }
+                final String renamedIndexName = indexEntry.getKey();
+                final IndexMetadata currentIndexMetadata = currentState.metadata().index(renamedIndexName);
+                final SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(
+                        restoreUUID,
+                        snapshot,
+                        snapshotInfo.version(),
+                        index
+                );
+                final boolean partial = checkPartial(index.getName());
+                final IntSet ignoreShards = new IntHashSet();
+                final IndexMetadata updatedIndexMetadata;
+
+                // different paths depending on whether we are restoring to create a new index or restoring over an existing closed index
+                // that will be opened by the restore
+                if (currentIndexMetadata == null) {
+                    // Index doesn't exist - create it and start recovery
+                    // Make sure that the index we are about to create has a validate name
+                    ensureValidIndexName(currentState, snapshotIndexMetadata, renamedIndexName);
+                    shardLimitValidator.validateShardLimit(snapshotIndexMetadata.getSettings(), currentState);
+
+                    final IndexMetadata.Builder indexMdBuilder = restoreToCreateNewIndex(snapshotIndexMetadata, renamedIndexName);
+                    if (request.includeAliases() == false && snapshotIndexMetadata.getAliases().isEmpty() == false
+                            && isSystemIndex(snapshotIndexMetadata) == false) {
+                        // Remove all aliases - they shouldn't be restored
+                        indexMdBuilder.removeAllAliases();
+                    } else {
+                        ensureNoAliasNameConflicts(snapshotIndexMetadata);
+                    }
+                    updatedIndexMetadata = indexMdBuilder.build();
+                    if (partial) {
+                        populateIgnoredShards(index.getName(), ignoreShards);
+                    }
+                    rtBuilder.addAsNewRestore(updatedIndexMetadata, recoverySource, ignoreShards);
+                    blocks.addBlocks(updatedIndexMetadata);
+                } else {
+                    // Index exists and it's closed - open it in metadata and start recovery
+                    validateExistingClosedIndex(currentIndexMetadata, snapshotIndexMetadata, renamedIndexName, partial);
+                    final IndexMetadata.Builder indexMdBuilder = restoreOverClosedIndex(snapshotIndexMetadata, currentIndexMetadata);
+
+                    if (request.includeAliases() == false && isSystemIndex(snapshotIndexMetadata) == false) {
+                        // Remove all snapshot aliases
+                        if (snapshotIndexMetadata.getAliases().isEmpty() == false) {
+                            indexMdBuilder.removeAllAliases();
+                        }
+                        // Add existing aliases
+                        for (ObjectCursor<AliasMetadata> alias : currentIndexMetadata.getAliases().values()) {
+                            indexMdBuilder.putAlias(alias.value);
+                        }
+                    } else {
+                        ensureNoAliasNameConflicts(snapshotIndexMetadata);
+                    }
+                    updatedIndexMetadata = indexMdBuilder.build();
+                    rtBuilder.addAsRestore(updatedIndexMetadata, recoverySource);
+                    blocks.updateBlocks(updatedIndexMetadata);
+                }
+
+                mdBuilder.put(updatedIndexMetadata, true);
+                final Index renamedIndex = updatedIndexMetadata.getIndex();
+                for (int shard = 0; shard < snapshotIndexMetadata.getNumberOfShards(); shard++) {
+                    shardsBuilder.put(
+                            new ShardId(renamedIndex, shard),
+                            ignoreShards.contains(shard)
+                                    ? new ShardRestoreStatus(localNodeId, RestoreInProgress.State.FAILURE)
+                                    : new ShardRestoreStatus(localNodeId)
+                    );
+                }
+            }
+
+            final ClusterState.Builder builder = ClusterState.builder(currentState);
+            final ImmutableOpenMap<ShardId, ShardRestoreStatus> shards = shardsBuilder.build();
+            if (shards.isEmpty() == false) {
+                builder.putCustom(
+                        RestoreInProgress.TYPE,
+                        new RestoreInProgress.Builder(currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).add(
+                            new RestoreInProgress.Entry(
+                                restoreUUID,
+                                snapshot,
+                                overallState(RestoreInProgress.State.INIT, shards),
+                                List.copyOf(indicesToRestore.keySet()),
+                                shards
+                            )
+                        ).build()
+                );
+            }
+
+            applyDataStreamRestores(currentState, mdBuilder);
+
+            // Restore global state if needed
+            if (request.includeGlobalState()) {
+                applyGlobalStateRestore(currentState, mdBuilder);
+            }
+
+            if (completed(shards)) {
+                // We don't have any indices to restore - we are done
+                restoreInfo = new RestoreInfo(snapshot.getSnapshotId().getName(),
+                    List.copyOf(indicesToRestore.keySet()),
+                    shards.size(),
+                    shards.size() - failedShards(shards));
+            }
+
+            updater.accept(currentState, mdBuilder);
+            return allocationService.reroute(
+                    builder.metadata(mdBuilder).blocks(blocks).routingTable(rtBuilder.build()).build(),
+                    "restored snapshot [" + snapshot + "]"
+            );
+        }
+
+        private void applyDataStreamRestores(ClusterState currentState, Metadata.Builder mdBuilder) {
+            final Map<String, DataStream> updatedDataStreams = new HashMap<>(currentState.metadata().dataStreams());
+            updatedDataStreams.putAll(dataStreamsToRestore.stream()
+                .map(ds -> updateDataStream(ds, mdBuilder, request))
+                .collect(Collectors.toMap(DataStream::getName, Function.identity())));
+            final Map<String, DataStreamAlias> updatedDataStreamAliases = new HashMap<>(currentState.metadata().dataStreamAliases());
+            metadata.dataStreamAliases().values().stream()
+                // Optionally rename the data stream names for each alias
+                .map(alias -> {
+                    if (request.renamePattern() != null && request.renameReplacement() != null) {
+                        List<String> renamedDataStreams = alias.getDataStreams().stream()
+                            .map(s -> s.replaceAll(request.renamePattern(), request.renameReplacement()))
+                            .collect(Collectors.toList());
+                        return new DataStreamAlias(alias.getName(), renamedDataStreams);
+                    } else {
+                        return alias;
+                    }
+                }).forEach(alias -> {
+                    final DataStreamAlias current = updatedDataStreamAliases.putIfAbsent(alias.getName(), alias);
+                    if (current != null) {
+                        // Merge data stream alias from snapshot with an existing data stream aliases in target cluster:
+                        Set<String> mergedDataStreams = new HashSet<>(current.getDataStreams());
+                        mergedDataStreams.addAll(alias.getDataStreams());
+                        DataStreamAlias newInstance = new DataStreamAlias(alias.getName(), List.copyOf(mergedDataStreams));
+                        updatedDataStreamAliases.put(alias.getName(), newInstance);
+                    }
+                });
+            mdBuilder.dataStreams(updatedDataStreams, updatedDataStreamAliases);
+        }
+
+        private void ensureSnapshotNotDeleted(ClusterState currentState) {
+            SnapshotDeletionsInProgress deletionsInProgress =
+                    currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
+            if (deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(snapshot.getSnapshotId()))) {
+                throw new ConcurrentSnapshotExecutionException(snapshot,
+                    "cannot restore a snapshot while a snapshot deletion is in-progress [" +
+                        deletionsInProgress.getEntries().get(0) + "]");
+            }
+        }
+
+        private void applyGlobalStateRestore(ClusterState currentState, Metadata.Builder mdBuilder) {
+            if (metadata.persistentSettings() != null) {
+                Settings settings = metadata.persistentSettings();
+                if (request.skipOperatorOnlyState()) {
+                    // Skip any operator-only settings from the snapshot. This happens when operator privileges are enabled
+                    final Set<String> operatorSettingKeys = Stream.concat(
+                        settings.keySet().stream(), currentState.metadata().persistentSettings().keySet().stream())
+                        .filter(k -> {
+                            final Setting<?> setting = clusterSettings.get(k);
+                            return setting != null && setting.isOperatorOnly();
+                        })
+                        .collect(Collectors.toSet());
+                    if (false == operatorSettingKeys.isEmpty()) {
+                        settings = Settings.builder()
+                            .put(settings.filter(k -> false == operatorSettingKeys.contains(k)))
+                            .put(currentState.metadata().persistentSettings().filter(operatorSettingKeys::contains))
+                            .build();
+                    }
+                }
+                clusterSettings.validateUpdate(settings);
+                mdBuilder.persistentSettings(settings);
+            }
+            if (metadata.templates() != null) {
+                // TODO: Should all existing templates be deleted first?
+                for (ObjectCursor<IndexTemplateMetadata> cursor : metadata.templates().values()) {
+                    mdBuilder.put(cursor.value);
+                }
+            }
+            if (metadata.customs() != null) {
+                for (ObjectObjectCursor<String, Metadata.Custom> cursor : metadata.customs()) {
+                    if (RepositoriesMetadata.TYPE.equals(cursor.key) == false
+                            && DataStreamMetadata.TYPE.equals(cursor.key) == false
+                            && cursor.value instanceof Metadata.NonRestorableCustom == false) {
+                        // TODO: Check request.skipOperatorOnly for Autoscaling policies (NonRestorableCustom)
+                        // Don't restore repositories while we are working with them
+                        // TODO: Should we restore them at the end?
+                        // Also, don't restore data streams here, we already added them to the metadata builder above
+                        mdBuilder.putCustom(cursor.key, cursor.value);
+                    }
+                }
+            }
+        }
+
+        private void ensureNoAliasNameConflicts(IndexMetadata snapshotIndexMetadata) {
+            for (ObjectCursor<String> alias : snapshotIndexMetadata.getAliases().keys()) {
+                final String aliasName = alias.value;
+                final IndexId indexId = indicesToRestore.get(aliasName);
+                if (indexId != null) {
+                    throw new SnapshotRestoreException(snapshot,
+                            "cannot rename index [" + indexId + "] into [" + aliasName
+                                    + "] because of conflict with an alias with the same name");
+                }
+            }
+        }
+
+        private void populateIgnoredShards(String index, IntSet ignoreShards) {
+            for (SnapshotShardFailure failure : snapshotInfo.shardFailures()) {
+                if (index.equals(failure.index())) {
+                    ignoreShards.add(failure.shardId());
+                }
+            }
+        }
+
+        private boolean checkPartial(String index) {
+            // Make sure that index was fully snapshotted
+            if (failed(snapshotInfo, index)) {
+                if (request.partial()) {
+                    return true;
+                } else {
+                    throw new SnapshotRestoreException(snapshot, "index [" + index + "] wasn't fully snapshotted - cannot restore");
+                }
+            } else {
+                return false;
+            }
+        }
+
+        private void validateExistingClosedIndex(IndexMetadata currentIndexMetadata, IndexMetadata snapshotIndexMetadata,
+                                                 String renamedIndex, boolean partial) {
+            // Index exist - checking that it's closed
+            if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) {
+                // TODO: Enable restore for open indices
+                throw new SnapshotRestoreException(snapshot, "cannot restore index [" + renamedIndex
+                    + "] because an open index " +
+                    "with same name already exists in the cluster. Either close or delete the existing index or restore the " +
+                    "index under a different name by providing a rename pattern and replacement name");
+            }
+            // Index exist - checking if it's partial restore
+            if (partial) {
+                throw new SnapshotRestoreException(snapshot, "cannot restore partial index [" + renamedIndex
+                    + "] because such index already exists");
+            }
+            // Make sure that the number of shards is the same. That's the only thing that we cannot change
+            if (currentIndexMetadata.getNumberOfShards() != snapshotIndexMetadata.getNumberOfShards()) {
+                throw new SnapshotRestoreException(snapshot,
+                    "cannot restore index [" + renamedIndex + "] with [" + currentIndexMetadata.getNumberOfShards()
+                        + "] shards from a snapshot of index [" + snapshotIndexMetadata.getIndex().getName() + "] with [" +
+                        snapshotIndexMetadata.getNumberOfShards() + "] shards");
+            }
+        }
+
+        @Override
+        public void onFailure(String source, Exception e) {
+            logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshot), e);
+            listener.onFailure(e);
+        }
+
+        @Override
+        public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+            listener.onResponse(new RestoreCompletionResponse(restoreUUID, snapshot, restoreInfo));
+        }
+    }
+
+    private static IndexMetadata.Builder restoreToCreateNewIndex(IndexMetadata snapshotIndexMetadata, String renamedIndexName) {
+        return IndexMetadata.builder(snapshotIndexMetadata)
+                .state(IndexMetadata.State.OPEN)
+                .index(renamedIndexName)
+                .settings(
+                    Settings.builder()
+                        .put(snapshotIndexMetadata.getSettings())
+                        .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
+                ).timestampRange(IndexLongFieldRange.NO_SHARDS);
+    }
+
+    private static IndexMetadata.Builder restoreOverClosedIndex(IndexMetadata snapshotIndexMetadata, IndexMetadata currentIndexMetadata) {
+        final IndexMetadata.Builder indexMdBuilder = IndexMetadata.builder(snapshotIndexMetadata)
+                .state(IndexMetadata.State.OPEN)
+                .version(Math.max(snapshotIndexMetadata.getVersion(), 1 + currentIndexMetadata.getVersion()))
+                .mappingVersion(Math.max(snapshotIndexMetadata.getMappingVersion(), 1 + currentIndexMetadata.getMappingVersion()))
+                .settingsVersion(Math.max(snapshotIndexMetadata.getSettingsVersion(), 1 + currentIndexMetadata.getSettingsVersion()))
+                .aliasesVersion(Math.max(snapshotIndexMetadata.getAliasesVersion(), 1 + currentIndexMetadata.getAliasesVersion()))
+                .timestampRange(IndexLongFieldRange.NO_SHARDS)
+                .index(currentIndexMetadata.getIndex().getName())
+                .settings(
+                    Settings.builder()
+                        .put(snapshotIndexMetadata.getSettings())
+                        .put(IndexMetadata.SETTING_INDEX_UUID, currentIndexMetadata.getIndexUUID())
+                        .put(IndexMetadata.SETTING_HISTORY_UUID, UUIDs.randomBase64UUID())
+                );
+        for (int shard = 0; shard < snapshotIndexMetadata.getNumberOfShards(); shard++) {
+            indexMdBuilder.primaryTerm(shard,
+                    Math.max(snapshotIndexMetadata.primaryTerm(shard), currentIndexMetadata.primaryTerm(shard)));
+        }
+        return indexMdBuilder;
+    }
+
+    private void ensureValidIndexName(ClusterState currentState, IndexMetadata snapshotIndexMetadata, String renamedIndexName) {
+        final boolean isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(snapshotIndexMetadata.getSettings());
+        createIndexService.validateIndexName(renamedIndexName, currentState);
+        createIndexService.validateDotIndex(renamedIndexName, isHidden);
+        createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetadata.getSettings(), false);
+    }
 }