Преглед изворни кода

SNAPSHOTS: Allow Parallel Restore Operations (#36397)

* Enable parallel restore operations
* Add uuid to restore in progress entries to uniquely identify them
* Adjust restore in progress entries to be a map in cluster state
* Added tests for:
   * Parallel restore from two different snapshots
   * Parallel restore from a single snapshot to different indices to test uuid identifiers are correctly used by `RestoreService` and routing allocator
   * Parallel restore with waiting for completion to test transport actions correctly use uuid identifiers
Armin Braun пре 6 година
родитељ
комит
c5b3ac5578
23 измењених фајлова са 374 додато и 140 уклоњено
  1. 2 1
      docs/reference/indices/recovery.asciidoc
  2. 3 2
      server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java
  3. 81 32
      server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java
  4. 22 5
      server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java
  5. 10 12
      server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java
  6. 85 63
      server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
  7. 1 1
      server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
  8. 2 2
      server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java
  9. 4 2
      server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java
  10. 1 1
      server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java
  11. 3 1
      server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java
  12. 5 1
      server/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java
  13. 8 4
      server/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java
  14. 5 3
      server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java
  15. 4 3
      server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java
  16. 2 1
      server/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java
  17. 1 1
      server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  18. 1 0
      server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
  19. 124 1
      server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
  20. 1 0
      test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java
  21. 3 1
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  22. 3 2
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java
  23. 3 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

+ 2 - 1
docs/reference/indices/recovery.asciidoc

@@ -90,7 +90,8 @@ Response:
         "repository" : "my_repository",
         "snapshot" : "my_snapshot",
         "index" : "index1",
-        "version" : "{version}"
+        "version" : "{version}",
+        "restoreUUID": "PDh1ZAOaRbiGIVtCvZOMww"
       },
       "target" : {
         "id" : "ryqJ5lO5S4-lSFbGntkEkg",

+ 3 - 2
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java

@@ -92,12 +92,13 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
             public void onResponse(RestoreCompletionResponse restoreCompletionResponse) {
                 if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
                     final Snapshot snapshot = restoreCompletionResponse.getSnapshot();
+                    String uuid = restoreCompletionResponse.getUuid();
 
                     ClusterStateListener clusterStateListener = new ClusterStateListener() {
                         @Override
                         public void clusterChanged(ClusterChangedEvent changedEvent) {
-                            final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), snapshot);
-                            final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), snapshot);
+                            final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
+                            final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
                             if (prevEntry == null) {
                                 // When there is a master failure after a restore has been started, this listener might not be registered
                                 // on the current master and as such it might miss some intermediary cluster states due to batching.

+ 81 - 32
server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.cluster;
 
+import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 
 import org.elasticsearch.Version;
@@ -33,36 +34,33 @@ import org.elasticsearch.snapshots.Snapshot;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.UUID;
 
 /**
  * Meta data about restore processes that are currently executing
  */
-public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements Custom {
+public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements Custom, Iterable<RestoreInProgress.Entry> {
+
+    /**
+     * Fallback UUID used for restore operations that were started before v7.0 and don't have a uuid in the cluster state.
+     */
+    public static final String BWC_UUID = new UUID(0, 0).toString();
 
     public static final String TYPE = "restore";
 
-    private final List<Entry> entries;
+    private final ImmutableOpenMap<String, Entry> entries;
 
     /**
      * Constructs new restore metadata
      *
-     * @param entries list of currently running restore processes
+     * @param entries map of currently running restore processes keyed by their restore uuid
      */
-    public RestoreInProgress(Entry... entries) {
-        this.entries = Arrays.asList(entries);
-    }
-
-    /**
-     * Returns list of currently running restore processes
-     *
-     * @return list of currently running restore processes
-     */
-    public List<Entry> entries() {
-        return this.entries;
+    private RestoreInProgress(ImmutableOpenMap<String, Entry> entries) {
+        this.entries = entries;
     }
 
     @Override
@@ -84,20 +82,48 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
 
     @Override
     public String toString() {
-        StringBuilder builder = new StringBuilder("RestoreInProgress[");
-        for (int i = 0; i < entries.size(); i++) {
-            builder.append(entries.get(i).snapshot().getSnapshotId().getName());
-            if (i + 1 < entries.size()) {
-                builder.append(",");
-            }
+        return new StringBuilder("RestoreInProgress[").append(entries).append("]").toString();
+    }
+
+    public Entry get(String restoreUUID) {
+        return entries.get(restoreUUID);
+    }
+
+    public boolean isEmpty() {
+        return entries.isEmpty();
+    }
+
+    @Override
+    public Iterator<Entry> iterator() {
+        return entries.valuesIt();
+    }
+
+    public static final class Builder {
+
+        private final ImmutableOpenMap.Builder<String, Entry> entries = ImmutableOpenMap.builder();
+
+        public Builder() {
+        }
+
+        public Builder(RestoreInProgress restoreInProgress) {
+            entries.putAll(restoreInProgress.entries);
+        }
+
+        public Builder add(Entry entry) {
+            entries.put(entry.uuid, entry);
+            return this;
+        }
+
+        public RestoreInProgress build() {
+            return new RestoreInProgress(entries.build());
         }
-        return builder.append("]").toString();
     }
 
     /**
      * Restore metadata
      */
     public static class Entry {
+        private final String uuid;
         private final State state;
         private final Snapshot snapshot;
         private final ImmutableOpenMap<ShardId, ShardRestoreStatus> shards;
@@ -106,12 +132,14 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
         /**
          * Creates new restore metadata
          *
+         * @param uuid       uuid of the restore
          * @param snapshot   snapshot
          * @param state      current state of the restore process
          * @param indices    list of indices being restored
          * @param shards     map of shards being restored to their current restore status
          */
-        public Entry(Snapshot snapshot, State state, List<String> indices, ImmutableOpenMap<ShardId, ShardRestoreStatus> shards) {
+        public Entry(String uuid, Snapshot snapshot, State state, List<String> indices,
+            ImmutableOpenMap<ShardId, ShardRestoreStatus> shards) {
             this.snapshot = Objects.requireNonNull(snapshot);
             this.state = Objects.requireNonNull(state);
             this.indices = Objects.requireNonNull(indices);
@@ -120,6 +148,15 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
             } else {
                 this.shards = shards;
             }
+            this.uuid = Objects.requireNonNull(uuid);
+        }
+
+        /**
+         * Returns restore uuid
+         * @return restore uuid
+         */
+        public String uuid() {
+            return uuid;
         }
 
         /**
@@ -167,7 +204,8 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
                 return false;
             }
             Entry entry = (Entry) o;
-            return snapshot.equals(entry.snapshot) &&
+            return uuid.equals(entry.uuid) &&
+                       snapshot.equals(entry.snapshot) &&
                        state == entry.state &&
                        indices.equals(entry.indices) &&
                        shards.equals(entry.shards);
@@ -175,7 +213,7 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
 
         @Override
         public int hashCode() {
-            return Objects.hash(snapshot, state, indices, shards);
+            return Objects.hash(uuid, snapshot, state, indices, shards);
         }
     }
 
@@ -394,8 +432,15 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
     }
 
     public RestoreInProgress(StreamInput in) throws IOException {
-        Entry[] entries = new Entry[in.readVInt()];
-        for (int i = 0; i < entries.length; i++) {
+        int count = in.readVInt();
+        final ImmutableOpenMap.Builder<String, Entry> entriesBuilder = ImmutableOpenMap.builder(count);
+        for (int i = 0; i < count; i++) {
+            final String uuid;
+            if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
+                uuid = in.readString();
+            } else {
+                uuid = BWC_UUID;
+            }
             Snapshot snapshot = new Snapshot(in);
             State state = State.fromValue(in.readByte());
             int indices = in.readVInt();
@@ -410,9 +455,9 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
                 ShardRestoreStatus shardState = ShardRestoreStatus.readShardRestoreStatus(in);
                 builder.put(shardId, shardState);
             }
-            entries[i] = new Entry(snapshot, state, Collections.unmodifiableList(indexBuilder), builder.build());
+            entriesBuilder.put(uuid, new Entry(uuid, snapshot, state, Collections.unmodifiableList(indexBuilder), builder.build()));
         }
-        this.entries = Arrays.asList(entries);
+        this.entries = entriesBuilder.build();
     }
 
     /**
@@ -421,7 +466,11 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeVInt(entries.size());
-        for (Entry entry : entries) {
+        for (ObjectCursor<Entry> v : entries.values()) {
+            Entry entry = v.value;
+            if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
+                out.writeString(entry.uuid);
+            }
             entry.snapshot().writeTo(out);
             out.writeByte(entry.state().value());
             out.writeVInt(entry.indices().size());
@@ -442,8 +491,8 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
         builder.startArray("snapshots");
-        for (Entry entry : entries) {
-            toXContent(entry, builder, params);
+        for (ObjectCursor<Entry> entry : entries.values()) {
+            toXContent(entry.value, builder, params);
         }
         builder.endArray();
         return builder;

+ 22 - 5
server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.cluster.routing;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.cluster.RestoreInProgress;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -208,22 +209,33 @@ public abstract class RecoverySource implements Writeable, ToXContentObject {
      * recovery from a snapshot
      */
     public static class SnapshotRecoverySource extends RecoverySource {
+        private final String restoreUUID;
         private final Snapshot snapshot;
         private final String index;
         private final Version version;
 
-        public SnapshotRecoverySource(Snapshot snapshot, Version version, String index) {
+        public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, String index) {
+            this.restoreUUID = restoreUUID;
             this.snapshot = Objects.requireNonNull(snapshot);
             this.version = Objects.requireNonNull(version);
             this.index = Objects.requireNonNull(index);
         }
 
         SnapshotRecoverySource(StreamInput in) throws IOException {
+            if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
+                restoreUUID = in.readString();
+            } else {
+                restoreUUID = RestoreInProgress.BWC_UUID;
+            }
             snapshot = new Snapshot(in);
             version = Version.readVersion(in);
             index = in.readString();
         }
 
+        public String restoreUUID() {
+            return restoreUUID;
+        }
+
         public Snapshot snapshot() {
             return snapshot;
         }
@@ -238,6 +250,9 @@ public abstract class RecoverySource implements Writeable, ToXContentObject {
 
         @Override
         protected void writeAdditionalFields(StreamOutput out) throws IOException {
+            if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
+                out.writeString(restoreUUID);
+            }
             snapshot.writeTo(out);
             Version.writeVersion(version, out);
             out.writeString(index);
@@ -253,12 +268,13 @@ public abstract class RecoverySource implements Writeable, ToXContentObject {
             builder.field("repository", snapshot.getRepository())
                 .field("snapshot", snapshot.getSnapshotId().getName())
                 .field("version", version.toString())
-                .field("index", index);
+                .field("index", index)
+                .field("restoreUUID", restoreUUID);
         }
 
         @Override
         public String toString() {
-            return "snapshot recovery from " + snapshot.toString();
+            return "snapshot recovery [" + restoreUUID + "] from " + snapshot;
         }
 
         @Override
@@ -271,12 +287,13 @@ public abstract class RecoverySource implements Writeable, ToXContentObject {
             }
 
             SnapshotRecoverySource that = (SnapshotRecoverySource) o;
-            return snapshot.equals(that.snapshot) && index.equals(that.index) && version.equals(that.version);
+            return restoreUUID.equals(that.restoreUUID) && snapshot.equals(that.snapshot)
+                && index.equals(that.index) && version.equals(that.version);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(snapshot, index, version);
+            return Objects.hash(restoreUUID, snapshot, index, version);
         }
 
     }

+ 10 - 12
server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java

@@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
-import org.elasticsearch.snapshots.Snapshot;
 
 /**
  * This {@link AllocationDecider} prevents shards that have failed to be
@@ -46,25 +45,24 @@ public class RestoreInProgressAllocationDecider extends AllocationDecider {
             return allocation.decision(Decision.YES, NAME, "ignored as shard is not being recovered from a snapshot");
         }
 
-        final Snapshot snapshot = ((RecoverySource.SnapshotRecoverySource) recoverySource).snapshot();
+        RecoverySource.SnapshotRecoverySource source = (RecoverySource.SnapshotRecoverySource) recoverySource;
         final RestoreInProgress restoresInProgress = allocation.custom(RestoreInProgress.TYPE);
 
         if (restoresInProgress != null) {
-            for (RestoreInProgress.Entry restoreInProgress : restoresInProgress.entries()) {
-                if (restoreInProgress.snapshot().equals(snapshot)) {
-                    RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId());
-                    if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) {
-                        assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting
-                            + "] to be in initializing state but got [" + shardRestoreStatus.state() + "]";
-                        return allocation.decision(Decision.YES, NAME, "shard is currently being restored");
-                    }
-                    break;
+            RestoreInProgress.Entry restoreInProgress = restoresInProgress.get(source.restoreUUID());
+            if (restoreInProgress != null) {
+                RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId());
+                if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) {
+                    assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting
+                        + "] to be in initializing state but got [" + shardRestoreStatus.state() + "]";
+                    return allocation.decision(Decision.YES, NAME, "shard is currently being restored");
                 }
             }
         }
         return allocation.decision(Decision.NO, NAME, "shard has failed to be restored from the snapshot [%s] because of [%s] - " +
             "manually close or delete the index [%s] in order to retry to restore the snapshot again or use the reroute API to force the " +
-            "allocation of an empty primary shard", snapshot, shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName());
+            "allocation of an empty primary shard",
+            source.snapshot(), shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName());
     }
 
     @Override

+ 85 - 63
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -219,15 +219,18 @@ public class RestoreService implements ClusterStateApplier {
             // Now we can start the actual restore process by adding shards to be recovered in the cluster state
             // and updating cluster metadata (global and index) as needed
             clusterService.submitStateUpdateTask(request.cause(), new ClusterStateUpdateTask() {
+                String restoreUUID = UUIDs.randomBase64UUID();
                 RestoreInfo restoreInfo = null;
 
                 @Override
                 public ClusterState execute(ClusterState currentState) {
-                    // Check if another restore process is already running - cannot run two restore processes at the
-                    // same time
                     RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
-                    if (restoreInProgress != null && !restoreInProgress.entries().isEmpty()) {
-                        throw new ConcurrentSnapshotExecutionException(snapshot, "Restore process is already running in this cluster");
+                    if (currentState.getNodes().getMinNodeVersion().before(Version.V_7_0_0)) {
+                        // Check if another restore process is already running - cannot run two restore processes at the
+                        // same time in versions prior to 7.0
+                        if (restoreInProgress != null && restoreInProgress.isEmpty() == false) {
+                            throw new ConcurrentSnapshotExecutionException(snapshot, "Restore process is already running in this cluster");
+                        }
                     }
                     // Check if the snapshot to restore is currently being deleted
                     SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
@@ -253,7 +256,7 @@ public class RestoreService implements ClusterStateApplier {
                         for (Map.Entry<String, String> indexEntry : indices.entrySet()) {
                             String index = indexEntry.getValue();
                             boolean partial = checkPartial(index);
-                            SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(snapshot, snapshotInfo.version(), index);
+                            SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index);
                             String renamedIndexName = indexEntry.getKey();
                             IndexMetaData snapshotIndexMetaData = metaData.index(index);
                             snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData, request.indexSettings, request.ignoreIndexSettings);
@@ -329,8 +332,18 @@ public class RestoreService implements ClusterStateApplier {
                         }
 
                         shards = shardsBuilder.build();
-                        RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshot, overallState(RestoreInProgress.State.INIT, shards), Collections.unmodifiableList(new ArrayList<>(indices.keySet())), shards);
-                        builder.putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restoreEntry));
+                        RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(
+                            restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards),
+                            Collections.unmodifiableList(new ArrayList<>(indices.keySet())),
+                            shards
+                        );
+                        RestoreInProgress.Builder restoreInProgressBuilder;
+                        if (restoreInProgress != null) {
+                            restoreInProgressBuilder = new RestoreInProgress.Builder(restoreInProgress);
+                        } else {
+                            restoreInProgressBuilder = new RestoreInProgress.Builder();
+                        }
+                        builder.putCustom(RestoreInProgress.TYPE, restoreInProgressBuilder.add(restoreEntry).build());
                     } else {
                         shards = ImmutableOpenMap.of();
                     }
@@ -485,7 +498,7 @@ public class RestoreService implements ClusterStateApplier {
 
                 @Override
                 public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                    listener.onResponse(new RestoreCompletionResponse(snapshot, restoreInfo));
+                    listener.onResponse(new RestoreCompletionResponse(restoreUUID, snapshot, restoreInfo));
                 }
             });
 
@@ -498,8 +511,8 @@ public class RestoreService implements ClusterStateApplier {
 
     public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set<Index> deletedIndices) {
         boolean changesMade = false;
-        final List<RestoreInProgress.Entry> entries = new ArrayList<>();
-        for (RestoreInProgress.Entry entry : oldRestore.entries()) {
+        RestoreInProgress.Builder builder = new RestoreInProgress.Builder();
+        for (RestoreInProgress.Entry entry : oldRestore) {
             ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> shardsBuilder = null;
             for (ObjectObjectCursor<ShardId, ShardRestoreStatus> cursor : entry.shards()) {
                 ShardId shardId = cursor.key;
@@ -513,27 +526,33 @@ public class RestoreService implements ClusterStateApplier {
             }
             if (shardsBuilder != null) {
                 ImmutableOpenMap<ShardId, ShardRestoreStatus> shards = shardsBuilder.build();
-                entries.add(new RestoreInProgress.Entry(entry.snapshot(), overallState(RestoreInProgress.State.STARTED, shards), entry.indices(), shards));
+                builder.add(new RestoreInProgress.Entry(entry.uuid(), entry.snapshot(), overallState(RestoreInProgress.State.STARTED, shards), entry.indices(), shards));
             } else {
-                entries.add(entry);
+                builder.add(entry);
             }
         }
         if (changesMade) {
-            return new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()]));
+            return builder.build();
         } else {
             return oldRestore;
         }
     }
 
     public static final class RestoreCompletionResponse {
+        private final String uuid;
         private final Snapshot snapshot;
         private final RestoreInfo restoreInfo;
 
-        private RestoreCompletionResponse(final Snapshot snapshot, final RestoreInfo restoreInfo) {
+        private RestoreCompletionResponse(final String uuid, final Snapshot snapshot, final RestoreInfo restoreInfo) {
+            this.uuid = uuid;
             this.snapshot = snapshot;
             this.restoreInfo = restoreInfo;
         }
 
+        public String getUuid() {
+            return uuid;
+        }
+
         public Snapshot getSnapshot() {
             return snapshot;
         }
@@ -544,7 +563,7 @@ public class RestoreService implements ClusterStateApplier {
     }
 
     public static class RestoreInProgressUpdater extends RoutingChangesObserver.AbstractRoutingChangesObserver {
-        private final Map<Snapshot, Updates> shardChanges = new HashMap<>();
+        private final Map<String, Updates> shardChanges = new HashMap<>();
 
         @Override
         public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
@@ -552,8 +571,8 @@ public class RestoreService implements ClusterStateApplier {
             if (initializingShard.primary()) {
                 RecoverySource recoverySource = initializingShard.recoverySource();
                 if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
-                    Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot();
-                    changes(snapshot).shards.put(initializingShard.shardId(),
+                    changes(recoverySource).shards.put(
+                        initializingShard.shardId(),
                         new ShardRestoreStatus(initializingShard.currentNodeId(), RestoreInProgress.State.SUCCESS));
                 }
             }
@@ -564,13 +583,13 @@ public class RestoreService implements ClusterStateApplier {
             if (failedShard.primary() && failedShard.initializing()) {
                 RecoverySource recoverySource = failedShard.recoverySource();
                 if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
-                    Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot();
                     // mark restore entry for this shard as failed when it's due to a file corruption. There is no need wait on retries
                     // to restore this shard on another node if the snapshot files are corrupt. In case where a node just left or crashed,
                     // however, we only want to acknowledge the restore operation once it has been successfully restored on another node.
                     if (unassignedInfo.getFailure() != null && Lucene.isCorruptionException(unassignedInfo.getFailure().getCause())) {
-                        changes(snapshot).shards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(),
-                            RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage()));
+                        changes(recoverySource).shards.put(
+                            failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(),
+                                RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage()));
                     }
                 }
             }
@@ -581,9 +600,11 @@ public class RestoreService implements ClusterStateApplier {
             // if we force an empty primary, we should also fail the restore entry
             if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT &&
                 initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) {
-                Snapshot snapshot = ((SnapshotRecoverySource) unassignedShard.recoverySource()).snapshot();
-                changes(snapshot).shards.put(unassignedShard.shardId(), new ShardRestoreStatus(null,
-                    RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource()));
+                changes(unassignedShard.recoverySource()).shards.put(
+                    unassignedShard.shardId(),
+                    new ShardRestoreStatus(null,
+                        RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource())
+                );
             }
         }
 
@@ -592,19 +613,21 @@ public class RestoreService implements ClusterStateApplier {
             RecoverySource recoverySource = unassignedShard.recoverySource();
             if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
                 if (newUnassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO) {
-                    Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot();
                     String reason = "shard could not be allocated to any of the nodes";
-                    changes(snapshot).shards.put(unassignedShard.shardId(),
+                    changes(recoverySource).shards.put(
+                        unassignedShard.shardId(),
                         new ShardRestoreStatus(unassignedShard.currentNodeId(), RestoreInProgress.State.FAILURE, reason));
                 }
             }
         }
 
         /**
-         * Helper method that creates update entry for the given shard id if such an entry does not exist yet.
+         * Helper method that creates update entry for the given recovery source's restore uuid
+         * if such an entry does not exist yet.
          */
-        private Updates changes(Snapshot snapshot) {
-            return shardChanges.computeIfAbsent(snapshot, k -> new Updates());
+        private Updates changes(RecoverySource recoverySource) {
+            assert recoverySource.getType() == RecoverySource.Type.SNAPSHOT;
+            return shardChanges.computeIfAbsent(((SnapshotRecoverySource) recoverySource).restoreUUID(), k -> new Updates());
         }
 
         private static class Updates {
@@ -613,38 +636,38 @@ public class RestoreService implements ClusterStateApplier {
 
         public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) {
             if (shardChanges.isEmpty() == false) {
-                final List<RestoreInProgress.Entry> entries = new ArrayList<>();
-                for (RestoreInProgress.Entry entry : oldRestore.entries()) {
-                    Snapshot snapshot = entry.snapshot();
-                    Updates updates = shardChanges.get(snapshot);
-                    if (updates.shards.isEmpty() == false) {
-                        ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder(entry.shards());
+                RestoreInProgress.Builder builder = new RestoreInProgress.Builder();
+                for (RestoreInProgress.Entry entry : oldRestore) {
+                    Updates updates = shardChanges.get(entry.uuid());
+                    ImmutableOpenMap<ShardId, ShardRestoreStatus> shardStates = entry.shards();
+                    if (updates != null && updates.shards.isEmpty() == false) {
+                        ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder(shardStates);
                         for (Map.Entry<ShardId, ShardRestoreStatus> shard : updates.shards.entrySet()) {
-                            shardsBuilder.put(shard.getKey(), shard.getValue());
+                            ShardId shardId = shard.getKey();
+                            ShardRestoreStatus status = shardStates.get(shardId);
+                            if (status == null || status.state().completed() == false) {
+                                shardsBuilder.put(shardId, shard.getValue());
+                            }
                         }
 
                         ImmutableOpenMap<ShardId, ShardRestoreStatus> shards = shardsBuilder.build();
                         RestoreInProgress.State newState = overallState(RestoreInProgress.State.STARTED, shards);
-                        entries.add(new RestoreInProgress.Entry(entry.snapshot(), newState, entry.indices(), shards));
+                        builder.add(new RestoreInProgress.Entry(entry.uuid(), entry.snapshot(), newState, entry.indices(), shards));
                     } else {
-                        entries.add(entry);
+                        builder.add(entry);
                     }
                 }
-                return new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()]));
+                return builder.build();
             } else {
                 return oldRestore;
             }
         }
     }
 
-    public static RestoreInProgress.Entry restoreInProgress(ClusterState state, Snapshot snapshot) {
+    public static RestoreInProgress.Entry restoreInProgress(ClusterState state, String restoreUUID) {
         final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE);
         if (restoreInProgress != null) {
-            for (RestoreInProgress.Entry e : restoreInProgress.entries()) {
-                if (e.snapshot().equals(snapshot)) {
-                    return e;
-                }
-            }
+            return restoreInProgress.get(restoreUUID);
         }
         return null;
     }
@@ -652,15 +675,15 @@ public class RestoreService implements ClusterStateApplier {
     static class CleanRestoreStateTaskExecutor implements ClusterStateTaskExecutor<CleanRestoreStateTaskExecutor.Task>, ClusterStateTaskListener {
 
         static class Task {
-            final Snapshot snapshot;
+            final String uuid;
 
-            Task(Snapshot snapshot) {
-                this.snapshot = snapshot;
+            Task(String uuid) {
+                this.uuid = uuid;
             }
 
             @Override
             public String toString() {
-                return "clean restore state for restoring snapshot " + snapshot;
+                return "clean restore state for restore " + uuid;
             }
         }
 
@@ -673,25 +696,24 @@ public class RestoreService implements ClusterStateApplier {
         @Override
         public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
             final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
-            Set<Snapshot> completedSnapshots = tasks.stream().map(e -> e.snapshot).collect(Collectors.toSet());
-            final List<RestoreInProgress.Entry> entries = new ArrayList<>();
+            Set<String> completedRestores = tasks.stream().map(e -> e.uuid).collect(Collectors.toSet());
+            RestoreInProgress.Builder restoreInProgressBuilder = new RestoreInProgress.Builder();
             final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
             boolean changed = false;
             if (restoreInProgress != null) {
-                for (RestoreInProgress.Entry entry : restoreInProgress.entries()) {
-                    if (completedSnapshots.contains(entry.snapshot()) == false) {
-                        entries.add(entry);
-                    } else {
+                for (RestoreInProgress.Entry entry : restoreInProgress) {
+                    if (completedRestores.contains(entry.uuid())) {
                         changed = true;
+                    } else {
+                        restoreInProgressBuilder.add(entry);
                     }
                 }
             }
             if (changed == false) {
                 return resultBuilder.build(currentState);
             }
-            RestoreInProgress updatedRestoreInProgress = new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()]));
             ImmutableOpenMap.Builder<String, ClusterState.Custom> builder = ImmutableOpenMap.builder(currentState.getCustoms());
-            builder.put(RestoreInProgress.TYPE, updatedRestoreInProgress);
+            builder.put(RestoreInProgress.TYPE, restoreInProgressBuilder.build());
             ImmutableOpenMap<String, ClusterState.Custom> customs = builder.build();
             return resultBuilder.build(ClusterState.builder(currentState).customs(customs).build());
         }
@@ -713,12 +735,12 @@ public class RestoreService implements ClusterStateApplier {
 
         RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE);
         if (restoreInProgress != null) {
-            for (RestoreInProgress.Entry entry : restoreInProgress.entries()) {
+            for (RestoreInProgress.Entry entry : restoreInProgress) {
                 if (entry.state().completed()) {
                     assert completed(entry.shards()) : "state says completed but restore entries are not";
                     clusterService.submitStateUpdateTask(
                         "clean up snapshot restore state",
-                        new CleanRestoreStateTaskExecutor.Task(entry.snapshot()),
+                        new CleanRestoreStateTaskExecutor.Task(entry.uuid()),
                         ClusterStateTaskConfig.build(Priority.URGENT),
                         cleanRestoreStateTaskExecutor,
                         cleanRestoreStateTaskExecutor);
@@ -815,7 +837,7 @@ public class RestoreService implements ClusterStateApplier {
         RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
         if (restore != null) {
             Set<Index> indicesToFail = null;
-            for (RestoreInProgress.Entry entry : restore.entries()) {
+            for (RestoreInProgress.Entry entry : restore) {
                 for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
                     if (!shard.value.state().completed()) {
                         IndexMetaData indexMetaData = currentState.metaData().index(shard.key.getIndex());
@@ -853,10 +875,10 @@ public class RestoreService implements ClusterStateApplier {
      * @return true if repository is currently in use by one of the running snapshots
      */
     public static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
-        RestoreInProgress snapshots = clusterState.custom(RestoreInProgress.TYPE);
-        if (snapshots != null) {
-            for (RestoreInProgress.Entry snapshot : snapshots.entries()) {
-                if (repository.equals(snapshot.snapshot().getRepository())) {
+        RestoreInProgress restoreInProgress = clusterState.custom(RestoreInProgress.TYPE);
+        if (restoreInProgress != null) {
+            for (RestoreInProgress.Entry entry: restoreInProgress) {
+                if (repository.equals(entry.snapshot().getRepository())) {
                     return true;
                 }
             }

+ 1 - 1
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -1137,7 +1137,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     // don't allow snapshot deletions while a restore is taking place,
                     // otherwise we could end up deleting a snapshot that is being restored
                     // and the files the restore depends on would all be gone
-                    if (restoreInProgress.entries().isEmpty() == false) {
+                    if (restoreInProgress.isEmpty() == false) {
                         throw new ConcurrentSnapshotExecutionException(snapshot, "cannot delete snapshot during a restore");
                     }
                 }

+ 2 - 2
server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java

@@ -205,8 +205,8 @@ public class ClusterModuleTests extends ModuleTestCase {
         final String whiteListedClusterCustom = randomFrom(ClusterModule.PRE_6_3_CLUSTER_CUSTOMS_WHITE_LIST);
         final String whiteListedMetaDataCustom = randomFrom(ClusterModule.PRE_6_3_METADATA_CUSTOMS_WHITE_LIST);
         final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
-            .putCustom(whiteListedClusterCustom, new RestoreInProgress())
-            .putCustom("other", new RestoreInProgress())
+            .putCustom(whiteListedClusterCustom, new RestoreInProgress.Builder().build())
+            .putCustom("other", new RestoreInProgress.Builder().build())
             .metaData(MetaData.builder()
                 .putCustom(whiteListedMetaDataCustom, new RepositoriesMetaData(Collections.emptyList()))
                 .putCustom("other", new RepositoriesMetaData(Collections.emptyList()))

+ 4 - 2
server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java

@@ -721,11 +721,13 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
                                 (long) randomIntBetween(0, 1000),
                                 ImmutableOpenMap.of()));
                     case 1:
-                        return new RestoreInProgress(new RestoreInProgress.Entry(
+                        return new RestoreInProgress.Builder().add(
+                            new RestoreInProgress.Entry(
+                                UUIDs.randomBase64UUID(),
                                 new Snapshot(randomName("repo"), new SnapshotId(randomName("snap"), UUIDs.randomBase64UUID())),
                                 RestoreInProgress.State.fromValue((byte) randomIntBetween(0, 3)),
                                 emptyList(),
-                                ImmutableOpenMap.of()));
+                                ImmutableOpenMap.of())).build();
                     default:
                         throw new IllegalArgumentException("Shouldn't be here");
                 }

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java

@@ -164,7 +164,7 @@ public class ShardRoutingTests extends ESTestCase {
                     } else {
                         otherRouting = new ShardRouting(otherRouting.shardId(), otherRouting.currentNodeId(),
                             otherRouting.relocatingNodeId(), otherRouting.primary(), otherRouting.state(),
-                            new RecoverySource.SnapshotRecoverySource(new Snapshot("test",
+                            new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), new Snapshot("test",
                                 new SnapshotId("s1", UUIDs.randomBase64UUID())), Version.CURRENT, "test"),
                             otherRouting.unassignedInfo(), otherRouting.allocationId(), otherRouting.getExpectedShardSize());
                     }

+ 3 - 1
server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java

@@ -142,6 +142,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
         ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
                 .metaData(metaData)
                 .routingTable(RoutingTable.builder().addAsNewRestore(metaData.index("test"), new SnapshotRecoverySource(
+                    UUIDs.randomBase64UUID(),
                     new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT, "test"),
                     new IntHashSet()).build()).build();
         for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
@@ -157,7 +158,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
         ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
                 .metaData(metaData)
                 .routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"),
-                    new SnapshotRecoverySource(new Snapshot("rep1",
+                    new SnapshotRecoverySource(
+                        UUIDs.randomBase64UUID(), new Snapshot("rep1",
                         new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT, "test")).build()).build();
         for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
             assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED));

+ 5 - 1
server/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java

@@ -366,7 +366,9 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
         ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
             .metaData(metaData)
             .routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"),
-                new SnapshotRecoverySource(new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
+                new SnapshotRecoverySource(
+                    UUIDs.randomBase64UUID(),
+                    new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
                 Version.CURRENT, "test")).build())
             .nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2)).build();
         AllocationDeciders allocationDeciders = new AllocationDeciders(Arrays.asList(
@@ -482,9 +484,11 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
             newNode.node().getVersion() + "] to a node with older version [" + oldNode.node().getVersion() + "]"));
 
         final SnapshotRecoverySource newVersionSnapshot = new SnapshotRecoverySource(
+            UUIDs.randomBase64UUID(),
             new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
             newNode.node().getVersion(), "test");
         final SnapshotRecoverySource oldVersionSnapshot = new SnapshotRecoverySource(
+            UUIDs.randomBase64UUID(),
             new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
             oldNode.node().getVersion(), "test");
 

+ 8 - 4
server/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java

@@ -42,6 +42,7 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
 import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
 import org.elasticsearch.cluster.routing.allocation.decider.Decision;
 import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
+import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
@@ -335,6 +336,7 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
         RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
         Snapshot snapshot = new Snapshot("repo", new SnapshotId("snap", "randomId"));
         Set<String> snapshotIndices = new HashSet<>();
+        String restoreUUID = UUIDs.randomBase64UUID();
         for (ObjectCursor<IndexMetaData> cursor: metaData.indices().values()) {
             Index index = cursor.value.getIndex();
             IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(cursor.value);
@@ -357,12 +359,14 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
                 case 3:
                     snapshotIndices.add(index.getName());
                     routingTableBuilder.addAsNewRestore(indexMetaData,
-                        new SnapshotRecoverySource(snapshot, Version.CURRENT, indexMetaData.getIndex().getName()), new IntHashSet());
+                        new SnapshotRecoverySource(
+                            restoreUUID, snapshot, Version.CURRENT, indexMetaData.getIndex().getName()), new IntHashSet());
                     break;
                 case 4:
                     snapshotIndices.add(index.getName());
                     routingTableBuilder.addAsRestore(indexMetaData,
-                        new SnapshotRecoverySource(snapshot, Version.CURRENT, indexMetaData.getIndex().getName()));
+                        new SnapshotRecoverySource(
+                            restoreUUID, snapshot, Version.CURRENT, indexMetaData.getIndex().getName()));
                     break;
                 case 5:
                     routingTableBuilder.addAsNew(indexMetaData);
@@ -385,9 +389,9 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
                 }
             }
 
-            RestoreInProgress.Entry restore = new RestoreInProgress.Entry(snapshot, RestoreInProgress.State.INIT,
+            RestoreInProgress.Entry restore = new RestoreInProgress.Entry(restoreUUID, snapshot, RestoreInProgress.State.INIT,
                 new ArrayList<>(snapshotIndices), restoreShards.build());
-            restores.put(RestoreInProgress.TYPE, new RestoreInProgress(restore));
+            restores.put(RestoreInProgress.TYPE, new RestoreInProgress.Builder().add(restore).build());
         }
 
         return ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))

+ 5 - 3
server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java

@@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.snapshots.Snapshot;
@@ -139,10 +140,11 @@ public class RestoreInProgressAllocationDeciderTests extends ESAllocationTestCas
 
         Snapshot snapshot = recoverySource.snapshot();
         RestoreInProgress.State restoreState = RestoreInProgress.State.STARTED;
-        RestoreInProgress.Entry restore = new RestoreInProgress.Entry(snapshot, restoreState, singletonList("test"), shards.build());
+        RestoreInProgress.Entry restore =
+            new RestoreInProgress.Entry(recoverySource.restoreUUID(), snapshot, restoreState, singletonList("test"), shards.build());
 
         clusterState = ClusterState.builder(clusterState)
-            .putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restore))
+            .putCustom(RestoreInProgress.TYPE, new RestoreInProgress.Builder().add(restore).build())
             .routingTable(routingTable)
             .build();
 
@@ -202,6 +204,6 @@ public class RestoreInProgressAllocationDeciderTests extends ESAllocationTestCas
 
     private RecoverySource.SnapshotRecoverySource createSnapshotRecoverySource(final String snapshotName) {
         Snapshot snapshot = new Snapshot("_repository", new SnapshotId(snapshotName, "_uuid"));
-        return new RecoverySource.SnapshotRecoverySource(snapshot, Version.CURRENT, "test");
+        return new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test");
     }
 }

+ 4 - 3
server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java

@@ -118,14 +118,15 @@ public class ClusterSerializationTests extends ESAllocationTestCase {
                 ));
         if (includeRestore) {
             builder.putCustom(RestoreInProgress.TYPE,
-                new RestoreInProgress(
+                new RestoreInProgress.Builder().add(
                     new RestoreInProgress.Entry(
-                        new Snapshot("repo2", new SnapshotId("snap2", UUIDs.randomBase64UUID())),
+                        UUIDs.randomBase64UUID(), new Snapshot("repo2", new SnapshotId("snap2", UUIDs.randomBase64UUID())),
                         RestoreInProgress.State.STARTED,
                         Collections.singletonList("index_name"),
                         ImmutableOpenMap.of()
                     )
-                ));
+                ).build()
+            );
         }
 
         ClusterState clusterState = builder.incrementVersion().build();

+ 2 - 1
server/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java

@@ -391,7 +391,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
 
         final Snapshot snapshot = new Snapshot("test", new SnapshotId("test", UUIDs.randomBase64UUID()));
         RoutingTable routingTable = RoutingTable.builder()
-            .addAsRestore(metaData.index(shardId.getIndex()), new SnapshotRecoverySource(snapshot, Version.CURRENT, shardId.getIndexName()))
+            .addAsRestore(metaData.index(shardId.getIndex()),
+                new SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, shardId.getIndexName()))
             .build();
         ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
             .metaData(metaData)

+ 1 - 1
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -2097,7 +2097,7 @@ public class IndexShardTests extends IndexShardTestCase {
             RecoverySource.ExistingStoreRecoverySource.INSTANCE);
         final Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID()));
         routing = ShardRoutingHelper.newWithRestoreSource(routing,
-            new RecoverySource.SnapshotRecoverySource(snapshot, Version.CURRENT, "test"));
+            new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test"));
         target = reinitShard(target, routing);
         Store sourceStore = source.store();
         Store targetStore = target.store();

+ 1 - 0
server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

@@ -499,6 +499,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
 
             for (RecoveryState recoveryState : recoveryStates) {
                 SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(
+                    ((SnapshotRecoverySource)recoveryState.getRecoverySource()).restoreUUID(),
                     new Snapshot(REPO_NAME, createSnapshotResponse.getSnapshotInfo().snapshotId()),
                     Version.CURRENT, INDEX_NAME);
                 assertRecoveryState(recoveryState, 0, recoverySource, true, Stage.DONE, null, nodeA);

+ 124 - 1
server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

@@ -89,6 +89,7 @@ import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.script.MockScriptEngine;
 import org.elasticsearch.script.StoredScriptsIT;
 import org.elasticsearch.snapshots.mockstore.MockRepository;
@@ -1113,7 +1114,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         // check that there is no restore in progress
         RestoreInProgress restoreInProgress = clusterStateResponse.getState().custom(RestoreInProgress.TYPE);
         assertNotNull("RestoreInProgress must be not null", restoreInProgress);
-        assertThat("RestoreInProgress must be empty", restoreInProgress.entries(), hasSize(0));
+        assertTrue(
+            "RestoreInProgress must be empty but found entries in " + restoreInProgress, restoreInProgress.isEmpty());
 
         // check that the shards have been created but are not assigned
         assertThat(clusterStateResponse.getState().getRoutingTable().allShards(indexName), hasSize(numShards.totalNumShards));
@@ -3510,6 +3512,127 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(15L));
     }
 
+    public void testParallelRestoreOperations() {
+        String indexName1 = "testindex1";
+        String indexName2 = "testindex2";
+        String repoName = "test-restore-snapshot-repo";
+        String snapshotName1 = "test-restore-snapshot1";
+        String snapshotName2 = "test-restore-snapshot2";
+        String absolutePath = randomRepoPath().toAbsolutePath().toString();
+        logger.info("Path [{}]", absolutePath);
+        String restoredIndexName1 = indexName1 + "-restored";
+        String restoredIndexName2 = indexName2 + "-restored";
+        String typeName = "actions";
+        String expectedValue = "expected";
+
+        Client client = client();
+        // Write a document
+        String docId = Integer.toString(randomInt());
+        index(indexName1, typeName, docId, "value", expectedValue);
+
+        String docId2 = Integer.toString(randomInt());
+        index(indexName2, typeName, docId2, "value", expectedValue);
+
+        logger.info("-->  creating repository");
+        assertAcked(client.admin().cluster().preparePutRepository(repoName)
+            .setType("fs").setSettings(Settings.builder()
+                .put("location", absolutePath)
+            ));
+
+        logger.info("--> snapshot");
+        CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName1)
+            .setWaitForCompletion(true)
+            .setIndices(indexName1)
+            .get();
+        assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
+        assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
+            equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
+        assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
+
+        CreateSnapshotResponse createSnapshotResponse2 = client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName2)
+            .setWaitForCompletion(true)
+            .setIndices(indexName2)
+            .get();
+        assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0));
+        assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(),
+            equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards()));
+        assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
+
+        RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName1)
+            .setWaitForCompletion(false)
+            .setRenamePattern(indexName1)
+            .setRenameReplacement(restoredIndexName1)
+            .get();
+        RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName2)
+            .setWaitForCompletion(false)
+            .setRenamePattern(indexName2)
+            .setRenameReplacement(restoredIndexName2)
+            .get();
+        assertThat(restoreSnapshotResponse1.status(), equalTo(RestStatus.ACCEPTED));
+        assertThat(restoreSnapshotResponse2.status(), equalTo(RestStatus.ACCEPTED));
+        ensureGreen(restoredIndexName1, restoredIndexName2);
+        assertThat(client.prepareGet(restoredIndexName1, typeName, docId).get().isExists(), equalTo(true));
+        assertThat(client.prepareGet(restoredIndexName2, typeName, docId2).get().isExists(), equalTo(true));
+    }
+
+    public void testParallelRestoreOperationsFromSingleSnapshot() throws Exception {
+        String indexName1 = "testindex1";
+        String indexName2 = "testindex2";
+        String repoName = "test-restore-snapshot-repo";
+        String snapshotName = "test-restore-snapshot";
+        String absolutePath = randomRepoPath().toAbsolutePath().toString();
+        logger.info("Path [{}]", absolutePath);
+        String restoredIndexName1 = indexName1 + "-restored";
+        String restoredIndexName2 = indexName2 + "-restored";
+        String typeName = "actions";
+        String expectedValue = "expected";
+
+        Client client = client();
+        // Write a document
+        String docId = Integer.toString(randomInt());
+        index(indexName1, typeName, docId, "value", expectedValue);
+
+        String docId2 = Integer.toString(randomInt());
+        index(indexName2, typeName, docId2, "value", expectedValue);
+
+        logger.info("-->  creating repository");
+        assertAcked(client.admin().cluster().preparePutRepository(repoName)
+            .setType("fs").setSettings(Settings.builder()
+                .put("location", absolutePath)
+            ));
+
+        logger.info("--> snapshot");
+        CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
+            .setWaitForCompletion(true)
+            .setIndices(indexName1, indexName2)
+            .get();
+        assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
+        assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
+            equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
+        assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
+
+        ActionFuture<RestoreSnapshotResponse> restoreSnapshotResponse1 = client.admin().cluster()
+            .prepareRestoreSnapshot(repoName, snapshotName)
+            .setIndices(indexName1)
+            .setRenamePattern(indexName1)
+            .setRenameReplacement(restoredIndexName1)
+            .execute();
+
+        boolean sameSourceIndex = randomBoolean();
+
+        ActionFuture<RestoreSnapshotResponse> restoreSnapshotResponse2 = client.admin().cluster()
+            .prepareRestoreSnapshot(repoName, snapshotName)
+            .setIndices(sameSourceIndex ? indexName1 : indexName2)
+            .setRenamePattern(sameSourceIndex ? indexName1 : indexName2)
+            .setRenameReplacement(restoredIndexName2)
+            .execute();
+        assertThat(restoreSnapshotResponse1.get().status(), equalTo(RestStatus.ACCEPTED));
+        assertThat(restoreSnapshotResponse2.get().status(), equalTo(RestStatus.ACCEPTED));
+        ensureGreen(restoredIndexName1, restoredIndexName2);
+        assertThat(client.prepareGet(restoredIndexName1, typeName, docId).get().isExists(), equalTo(true));
+        assertThat(client.prepareGet(restoredIndexName2, typeName, sameSourceIndex ? docId : docId2).get().isExists(), equalTo(true));
+    }
+
     @TestLogging("org.elasticsearch.snapshots:TRACE")
     public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
         final Client client = client();

+ 1 - 0
test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java

@@ -144,6 +144,7 @@ public class TestShardRouting {
             RecoverySource.PeerRecoverySource.INSTANCE,
             RecoverySource.LocalShardsRecoverySource.INSTANCE,
             new RecoverySource.SnapshotRecoverySource(
+                UUIDs.randomBase64UUID(),
                 new Snapshot("repo", new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID())),
                 Version.CURRENT,
                 "some_index"));

+ 3 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.TestShardRouting;
 import org.elasticsearch.common.CheckedFunction;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.settings.ClusterSettings;
@@ -757,7 +758,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
         final String index = shardId.getIndexName();
         final IndexId indexId = new IndexId(shardId.getIndex().getName(), shardId.getIndex().getUUID());
         final DiscoveryNode node = getFakeDiscoNode(shard.routingEntry().currentNodeId());
-        final RecoverySource.SnapshotRecoverySource recoverySource = new RecoverySource.SnapshotRecoverySource(snapshot, version, index);
+        final RecoverySource.SnapshotRecoverySource recoverySource =
+            new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, version, index);
         final ShardRouting shardRouting = newShardRouting(shardId, node.getId(), true, ShardRoutingState.INITIALIZING, recoverySource);
 
         shard.markAsRecovering("from snapshot", new RecoveryState(shardRouting, node, null));

+ 3 - 2
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

@@ -158,12 +158,13 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
             public void onResponse(RestoreService.RestoreCompletionResponse restoreCompletionResponse) {
                 if (restoreCompletionResponse.getRestoreInfo() == null) {
                     final Snapshot snapshot = restoreCompletionResponse.getSnapshot();
+                    final String uuid = restoreCompletionResponse.getUuid();
 
                     ClusterStateListener clusterStateListener = new ClusterStateListener() {
                         @Override
                         public void clusterChanged(ClusterChangedEvent changedEvent) {
-                            final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), snapshot);
-                            final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), snapshot);
+                            final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
+                            final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
                             if (prevEntry == null) {
                                 // When there is a master failure after a restore has been started, this listener might not be registered
                                 // on the current master and as such it might miss some intermediary cluster states due to batching.

+ 3 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

@@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.settings.Settings;
@@ -204,7 +205,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
         shard.refresh("test");
         ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), true,
             ShardRoutingState.INITIALIZING,
-            new RecoverySource.SnapshotRecoverySource(new Snapshot("src_only", snapshotId), Version.CURRENT, indexId.getId()));
+            new RecoverySource.SnapshotRecoverySource(
+                UUIDs.randomBase64UUID(), new Snapshot("src_only", snapshotId), Version.CURRENT, indexId.getId()));
         IndexMetaData metaData = runAsSnapshot(threadPool, () -> repository.getSnapshotIndexMetaData(snapshotId, indexId));
         IndexShard restoredShard = newShard(shardRouting, metaData, null, SourceOnlySnapshotRepository.getEngineFactory(), () -> {});
         restoredShard.mapperService().merge(shard.indexSettings().getIndexMetaData(), MapperService.MergeReason.MAPPING_RECOVERY);