Browse Source

Track Shard-Snapshot Index Generation at Repository Root (#46250)

### Changes to Root-Level index-N (RepositoryData)

This change adds a new field `"shards"` to `RepositoryData` that contains a mapping of `IndexId` to a `String[]`. This string array can be accessed by shard id to get the generation of a shard's shard folder (i.e. the `N` in the name of the currently valid `/indices/${indexId}/${shardId}/index-${N}` for the shard in question).

### Benefits

This allows for creating a new snapshot in the shard without doing any LIST operations on the shard's folder. In the case of AWS S3, this saves about 1/3 of the cost for updating an empty shard (see #45736) and removes one out of two remaining potential issues with eventually consistent blob stores (see #38941 ... now only the root `index-${N}` is determined by listing).

Also and equally if not more important, a number of possible failure modes on eventually consistent blob stores like AWS S3 are eliminated by moving all delete operations to the `master` node and moving from incremental naming of shard level index-N to uuid suffixes for these blobs.

### Only Master Deletes Blobs

This change moves the deleting of the previous shard level `index-${uuid}` blob to the master node instead of the data node allowing for a safe and consistent update of the shard's generation in the `RepositoryData` by first updating `RepositoryData` and then deleting the now unreferenced `index-${newUUID}` blob.
__No deletes are executed on the data nodes at all for any operation with this change.__

Note also: Previous issues with hanging data nodes interfering with master nodes are completely impossible, even on S3 (see next section for details).

### Why Move from index-${N} to index-${uuid} at the Shard Level

This change changes the naming of the shard level `index-${N}` blobs to a uuid suffix `index-${UUID}`. The reason for this is the fact that writing a new shard-level `index-` generation blob is not atomic anymore in its effect. Not only does the blob have to be written to have an effect, it must also be referenced by the root level `index-N` (`RepositoryData`) to become an effective part of the snapshot repository.
This leads to a problem if we were to use incrementing names like we did before. If a blob `index-${N+1}` is written but due to the node/network/cluster/... crashes the root level `RepositoryData` has not been updated then a future operation will determine the shard's generation to be `N` and try to write a new `index-${N+1}` to the already existing path. Updates like that are problematic on S3 for consistency reasons, but also create numerous issues when thinking about stuck data nodes.
Previously stuck data nodes that were tasked to write `index-${N+1}` but got stuck and tried to do so after some other node had already written `index-${N+1}` were prevented form doing so (except for on S3) by us not allowing overwrites for that blob and thus no corruption could occur.
Were we to continue using incrementing names, we could not do this. The stuck node scenario would either allow for overwriting the `N+1` generation or force us to continue using a `LIST` operation to figure out the next `N` (which would make this change pointless).
With uuid naming and moving all deletes to `master` this becomes a non-issue. Data nodes write updated shard generation `index-${uuid}` and `master` makes those `index-${uuid}` part of the `RepositoryData` that it deems correct and cleans up all those `index-` that are unused.

Co-authored-by: Yannick Welsch <yannick@welsch.lu>
Co-authored-by: Tanguy Leroux <tlrx.dev@gmail.com>
Armin Braun 6 years ago
parent
commit
8c13cf704b
29 changed files with 989 additions and 340 deletions
  1. 4 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java
  2. 33 6
      server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
  3. 11 9
      server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
  4. 19 14
      server/src/main/java/org/elasticsearch/repositories/Repository.java
  5. 63 13
      server/src/main/java/org/elasticsearch/repositories/RepositoryData.java
  6. 215 0
      server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java
  7. 288 158
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
  8. 19 15
      server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java
  9. 5 3
      server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
  10. 71 37
      server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
  11. 2 1
      server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java
  12. 1 1
      server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java
  13. 1 1
      server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java
  14. 1 1
      server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java
  15. 5 4
      server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
  16. 28 11
      server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java
  17. 12 0
      server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
  18. 14 13
      server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
  19. 2 3
      server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java
  20. 96 6
      server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
  21. 1 1
      server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java
  22. 8 6
      server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java
  23. 3 2
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  24. 9 9
      test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
  25. 47 4
      test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java
  26. 8 6
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
  27. 12 8
      x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java
  28. 9 6
      x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java
  29. 2 1
      x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java

+ 4 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

@@ -42,6 +42,7 @@ import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryCleanupResult;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
+import org.elasticsearch.snapshots.SnapshotsService;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -201,7 +202,9 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
                     logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
                     threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
                         l -> blobStoreRepository.cleanup(
-                            repositoryStateId, ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
+                            repositoryStateId,
+                            newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
+                            ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
                 }
 
                 private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {

+ 33 - 6
server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

@@ -91,12 +91,14 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
         private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
         private final long startTime;
         private final long repositoryStateId;
+        // see #useShardGenerations
+        private final boolean useShardGenerations;
         @Nullable private final Map<String, Object> userMetadata;
         @Nullable private final String failure;
 
         public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
                      long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
-                     String failure, Map<String, Object> userMetadata) {
+                     String failure, Map<String, Object> userMetadata, boolean useShardGenerations) {
             this.state = state;
             this.snapshot = snapshot;
             this.includeGlobalState = includeGlobalState;
@@ -114,6 +116,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             this.repositoryStateId = repositoryStateId;
             this.failure = failure;
             this.userMetadata = userMetadata;
+            this.useShardGenerations = useShardGenerations;
         }
 
         private static boolean assertShardsConsistent(State state, List<IndexId> indices,
@@ -128,20 +131,22 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
                 : "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
             return true;
         }
+
         public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
                      long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
-                     Map<String, Object> userMetadata) {
-            this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata);
+                     Map<String, Object> userMetadata, boolean useShardGenerations) {
+            this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata,
+                useShardGenerations);
         }
 
         public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
             this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
-                entry.repositoryStateId, shards, entry.failure, entry.userMetadata);
+                entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations);
         }
 
         public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
             this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
-                 entry.repositoryStateId, shards, failure, entry.userMetadata);
+                 entry.repositoryStateId, shards, failure, entry.userMetadata, entry.useShardGenerations);
         }
 
         public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
@@ -192,6 +197,16 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             return failure;
         }
 
+        /**
+         * Whether to write to the repository in a format only understood by versions newer than
+         * {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
+         *
+         * @return true if writing to repository in new format
+         */
+        public boolean useShardGenerations() {
+            return useShardGenerations;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
@@ -207,6 +222,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             if (!snapshot.equals(entry.snapshot)) return false;
             if (state != entry.state) return false;
             if (repositoryStateId != entry.repositoryStateId) return false;
+            if (useShardGenerations != entry.useShardGenerations) return false;
 
             return true;
         }
@@ -221,6 +237,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             result = 31 * result + indices.hashCode();
             result = 31 * result + Long.hashCode(startTime);
             result = 31 * result + Long.hashCode(repositoryStateId);
+            result = 31 * result + (useShardGenerations ? 1 : 0);
             return result;
         }
 
@@ -503,6 +520,12 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
                 userMetadata = in.readMap();
             }
+            final boolean useShardGenerations;
+            if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
+                useShardGenerations = in.readBoolean();
+            } else {
+                useShardGenerations = false;
+            }
             entries[i] = new Entry(snapshot,
                                    includeGlobalState,
                                    partial,
@@ -512,7 +535,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
                                    repositoryStateId,
                                    builder.build(),
                                    failure,
-                                   userMetadata
+                                   userMetadata,
+                                   useShardGenerations
                 );
         }
         this.entries = Arrays.asList(entries);
@@ -541,6 +565,9 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
                 out.writeMap(entry.userMetadata);
             }
+            if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
+                out.writeBoolean(entry.useShardGenerations);
+            }
         }
     }
 

+ 11 - 9
server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

@@ -73,16 +73,17 @@ public class FilterRepository implements Repository {
     }
 
     @Override
-    public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
-                                 List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
-                                 MetaData metaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener) {
-        in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
-            includeGlobalState, metaData, userMetadata, listener);
+    public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
+                                 int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
+                                 boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
+                                 boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
+        in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
+            includeGlobalState, metaData, userMetadata, writeShardGens, listener);
     }
 
     @Override
-    public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
-        in.deleteSnapshot(snapshotId, repositoryStateId, listener);
+    public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
+        in.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
     }
 
     @Override
@@ -117,8 +118,9 @@ public class FilterRepository implements Repository {
 
     @Override
     public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
-        in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
+                              ActionListener<String> listener) {
+        in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener);
     }
     @Override
     public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {

+ 19 - 14
server/src/main/java/org/elasticsearch/repositories/Repository.java

@@ -112,28 +112,33 @@ public interface Repository extends LifecycleComponent {
      * <p>
      * This method is called on master after all shards are snapshotted.
      *
-     * @param snapshotId    snapshot id
-     * @param indices       list of indices in the snapshot
-     * @param startTime     start time of the snapshot
-     * @param failure       global failure reason or null
-     * @param totalShards   total number of shards
-     * @param shardFailures list of shard failures
-     * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
+     * @param snapshotId         snapshot id
+     * @param shardGenerations   updated shard generations
+     * @param startTime          start time of the snapshot
+     * @param failure            global failure reason or null
+     * @param totalShards        total number of shards
+     * @param shardFailures      list of shard failures
+     * @param repositoryStateId  the unique id identifying the state of the repository when the snapshot began
      * @param includeGlobalState include cluster global state
+     * @param clusterMetaData    cluster metadata
+     * @param userMetadata       user metadata
+     * @param writeShardGens     if shard generations should be written to the repository
      * @param listener listener to be called on completion of the snapshot
      */
-    void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
-                          List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
-                          MetaData clusterMetaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener);
+    void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
+                          int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
+                          boolean includeGlobalState, MetaData clusterMetaData, Map<String, Object> userMetadata,
+                          boolean writeShardGens, ActionListener<SnapshotInfo> listener);
 
     /**
      * Deletes snapshot
      *
-     * @param snapshotId snapshot id
+     * @param snapshotId        snapshot id
      * @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
-     * @param listener completion listener
+     * @param writeShardGens    if shard generations should be written to the repository
+     * @param listener          completion listener
      */
-    void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener);
+    void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener);
 
     /**
      * Returns snapshot throttle time in nanoseconds
@@ -195,7 +200,7 @@ public interface Repository extends LifecycleComponent {
      * @param listener            listener invoked on completion
      */
     void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
-                       IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener);
+                       IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener<String> listener);
 
     /**
      * Restores snapshot of the shard.

+ 63 - 13
server/src/main/java/org/elasticsearch/repositories/RepositoryData.java

@@ -25,6 +25,7 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentParserUtils;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotState;
 
@@ -55,7 +56,7 @@ public final class RepositoryData {
      * An instance initialized for an empty repository.
      */
     public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN,
-        Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+        Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
 
     /**
      * The generational id of the index file from which the repository data was read.
@@ -78,19 +79,30 @@ public final class RepositoryData {
      */
     private final Map<IndexId, Set<SnapshotId>> indexSnapshots;
 
+    /**
+     * Shard generations.
+     */
+    private final ShardGenerations shardGenerations;
 
     public RepositoryData(long genId, Map<String, SnapshotId> snapshotIds, Map<String, SnapshotState> snapshotStates,
-                          Map<IndexId, Set<SnapshotId>> indexSnapshots) {
+                          Map<IndexId, Set<SnapshotId>> indexSnapshots, ShardGenerations shardGenerations) {
         this.genId = genId;
         this.snapshotIds = Collections.unmodifiableMap(snapshotIds);
         this.snapshotStates = Collections.unmodifiableMap(snapshotStates);
         this.indices = Collections.unmodifiableMap(indexSnapshots.keySet().stream()
             .collect(Collectors.toMap(IndexId::getName, Function.identity())));
         this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
+        this.shardGenerations = Objects.requireNonNull(shardGenerations);
+        assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices "
+            + shardGenerations.indices() + " but snapshots only reference indices " + indices.values();
     }
 
     protected RepositoryData copy() {
-        return new RepositoryData(genId, snapshotIds, snapshotStates, indexSnapshots);
+        return new RepositoryData(genId, snapshotIds, snapshotStates, indexSnapshots, shardGenerations);
+    }
+
+    public ShardGenerations shardGenerations() {
+        return shardGenerations;
     }
 
     /**
@@ -140,10 +152,15 @@ public final class RepositoryData {
     /**
      * Add a snapshot and its indices to the repository; returns a new instance.  If the snapshot
      * already exists in the repository data, this method throws an IllegalArgumentException.
+     *
+     * @param snapshotId       Id of the new snapshot
+     * @param snapshotState    State of the new snapshot
+     * @param shardGenerations Updated shard generations in the new snapshot. For each index contained in the snapshot an array of new
+     *                         generations indexed by the shard id they correspond to must be supplied.
      */
     public RepositoryData addSnapshot(final SnapshotId snapshotId,
                                       final SnapshotState snapshotState,
-                                      final List<IndexId> snapshottedIndices) {
+                                      final ShardGenerations shardGenerations) {
         if (snapshotIds.containsKey(snapshotId.getUUID())) {
             // if the snapshot id already exists in the repository data, it means an old master
             // that is blocked from the cluster is trying to finalize a snapshot concurrently with
@@ -155,10 +172,11 @@ public final class RepositoryData {
         Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
         newSnapshotStates.put(snapshotId.getUUID(), snapshotState);
         Map<IndexId, Set<SnapshotId>> allIndexSnapshots = new HashMap<>(indexSnapshots);
-        for (final IndexId indexId : snapshottedIndices) {
+        for (final IndexId indexId : shardGenerations.indices()) {
             allIndexSnapshots.computeIfAbsent(indexId, k -> new LinkedHashSet<>()).add(snapshotId);
         }
-        return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots);
+        return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots,
+            ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build());
     }
 
     /**
@@ -171,13 +189,18 @@ public final class RepositoryData {
         if (newGeneration == genId) {
             return this;
         }
-        return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots);
+        return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots, this.shardGenerations);
     }
 
     /**
      * Remove a snapshot and remove any indices that no longer exist in the repository due to the deletion of the snapshot.
+     *
+     * @param snapshotId              Snapshot Id
+     * @param updatedShardGenerations Shard generations that changed as a result of removing the snapshot.
+     *                                The {@code String[]} passed for each {@link IndexId} contains the new shard generation id for each
+     *                                changed shard indexed by its shardId
      */
-    public RepositoryData removeSnapshot(final SnapshotId snapshotId) {
+    public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGenerations updatedShardGenerations) {
         Map<String, SnapshotId> newSnapshotIds = snapshotIds.values().stream()
             .filter(id -> !snapshotId.equals(id))
             .collect(Collectors.toMap(SnapshotId::getUUID, Function.identity()));
@@ -205,7 +228,10 @@ public final class RepositoryData {
             indexSnapshots.put(indexId, set);
         }
 
-        return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, indexSnapshots);
+        return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, indexSnapshots,
+            ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations)
+                .retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build()
+        );
     }
 
     /**
@@ -231,12 +257,13 @@ public final class RepositoryData {
         return snapshotIds.equals(that.snapshotIds)
                    && snapshotStates.equals(that.snapshotStates)
                    && indices.equals(that.indices)
-                   && indexSnapshots.equals(that.indexSnapshots);
+                   && indexSnapshots.equals(that.indexSnapshots)
+                   && shardGenerations.equals(that.shardGenerations);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(snapshotIds, snapshotStates, indices, indexSnapshots);
+        return Objects.hash(snapshotIds, snapshotStates, indices, indexSnapshots, shardGenerations);
     }
 
     /**
@@ -276,6 +303,7 @@ public final class RepositoryData {
         return snapshotIndices;
     }
 
+    private static final String SHARD_GENERATIONS = "shard_generations";
     private static final String SNAPSHOTS = "snapshots";
     private static final String INDICES = "indices";
     private static final String INDEX_ID = "id";
@@ -286,7 +314,10 @@ public final class RepositoryData {
     /**
      * Writes the snapshots metadata and the related indices metadata to x-content.
      */
-    public XContentBuilder snapshotsToXContent(final XContentBuilder builder) throws IOException {
+    public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final boolean shouldWriteShardGens) throws IOException {
+        assert shouldWriteShardGens || shardGenerations.indices().isEmpty() :
+            "Should not build shard generations in BwC mode but saw generations [" + shardGenerations + "]";
+
         builder.startObject();
         // write the snapshots list
         builder.startArray(SNAPSHOTS);
@@ -312,6 +343,13 @@ public final class RepositoryData {
                 builder.value(snapshotId.getUUID());
             }
             builder.endArray();
+            if (shouldWriteShardGens) {
+                builder.startArray(SHARD_GENERATIONS);
+                for (String gen : shardGenerations.getGens(indexId)) {
+                    builder.value(gen);
+                }
+                builder.endArray();
+            }
             builder.endObject();
         }
         builder.endObject();
@@ -326,6 +364,7 @@ public final class RepositoryData {
         final Map<String, SnapshotId> snapshots = new HashMap<>();
         final Map<String, SnapshotState> snapshotStates = new HashMap<>();
         final Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
+        final ShardGenerations.Builder shardGenerations = ShardGenerations.builder();
 
         if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
             while (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
@@ -363,6 +402,7 @@ public final class RepositoryData {
                     while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
                         final String indexName = parser.currentName();
                         final Set<SnapshotId> snapshotIds = new LinkedHashSet<>();
+                        final List<String> gens = new ArrayList<>();
 
                         IndexId indexId = null;
                         if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
@@ -405,10 +445,20 @@ public final class RepositoryData {
                                             + " references an unknown snapshot uuid [" + uuid + "]");
                                     }
                                 }
+                            } else if (SHARD_GENERATIONS.equals(indexMetaFieldName)) {
+                                XContentParserUtils.ensureExpectedToken(
+                                    XContentParser.Token.START_ARRAY, parser.currentToken(), parser::getTokenLocation);
+                                while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
+                                    gens.add(parser.textOrNull());
+                                }
+
                             }
                         }
                         assert indexId != null;
                         indexSnapshots.put(indexId, snapshotIds);
+                        for (int i = 0; i < gens.size(); i++) {
+                            shardGenerations.put(indexId, i, gens.get(i));
+                        }
                     }
                 } else {
                     throw new ElasticsearchParseException("unknown field name  [" + field + "]");
@@ -417,7 +467,7 @@ public final class RepositoryData {
         } else {
             throw new ElasticsearchParseException("start object expected");
         }
-        return new RepositoryData(genId, snapshots, snapshotStates, indexSnapshots);
+        return new RepositoryData(genId, snapshots, snapshotStates, indexSnapshots, shardGenerations.build());
     }
 
 }

+ 215 - 0
server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java

@@ -0,0 +1,215 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.repositories;
+
+import org.elasticsearch.common.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public final class ShardGenerations {
+
+    public static final ShardGenerations EMPTY = new ShardGenerations(Collections.emptyMap());
+
+    /**
+     * Special generation that signifies that a shard is new and the repository does not yet contain a valid
+     * {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob for it.
+     */
+    public static final String NEW_SHARD_GEN = "_new";
+
+    /**
+     * Special generation that signifies that the shard has been deleted from the repository.
+     * This generation is only used during computations. It should never be written to disk.
+     */
+    public static final String DELETED_SHARD_GEN = "_deleted";
+
+    private final Map<IndexId, List<String>> shardGenerations;
+
+    private ShardGenerations(Map<IndexId, List<String>> shardGenerations) {
+        this.shardGenerations = shardGenerations;
+    }
+
+    /**
+     * Returns all indices for which shard generations are tracked.
+     *
+     * @return indices for which shard generations are tracked
+     */
+    public Collection<IndexId> indices() {
+        return Collections.unmodifiableSet(shardGenerations.keySet());
+    }
+
+    /**
+     * Computes the obsolete shard index generations that can be deleted once this instance was written to the repository.
+     * Note: This method should only be used when finalizing a snapshot and we can safely assume that data has only been added but not
+     *       removed from shard paths.
+     *
+     * @param previous Previous {@code ShardGenerations}
+     * @return Map of obsolete shard index generations in indices that are still tracked by this instance
+     */
+    public Map<IndexId, Map<Integer, String>> obsoleteShardGenerations(ShardGenerations previous) {
+        final Map<IndexId, Map<Integer, String>> result = new HashMap<>();
+        previous.shardGenerations.forEach(((indexId, oldGens) -> {
+            final List<String> updatedGenerations = shardGenerations.get(indexId);
+            final Map<Integer, String> obsoleteShardIndices = new HashMap<>();
+            assert updatedGenerations != null
+                : "Index [" + indexId + "] present in previous shard generations, but missing from updated generations";
+            for (int i = 0; i < Math.min(oldGens.size(), updatedGenerations.size()); i++) {
+                final String oldGeneration = oldGens.get(i);
+                final String updatedGeneration = updatedGenerations.get(i);
+                // If we had a previous generation that is different from an updated generation it's obsolete
+                // Since this method assumes only additions and no removals of shards, a null updated generation means no update
+                if (updatedGeneration != null && oldGeneration != null && oldGeneration.equals(updatedGeneration) == false) {
+                    obsoleteShardIndices.put(i, oldGeneration);
+                }
+            }
+            result.put(indexId, Collections.unmodifiableMap(obsoleteShardIndices));
+        }));
+        return Collections.unmodifiableMap(result);
+    }
+
+    /**
+     * Get the generation of the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob for a given index
+     * and shard.
+     * There are three special kinds of generations that can be returned here.
+     * <ul>
+     *     <li>{@link #DELETED_SHARD_GEN} a deleted shard that isn't referenced by any snapshot in the repository any longer</li>
+     *     <li>{@link #NEW_SHARD_GEN} a new shard that we know doesn't hold any valid data yet in the repository</li>
+     *     <li>{@code null} unknown state. The shard either does not exist at all or it was created by a node older than
+     *     {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. If a caller expects a shard to exist in the
+     *     repository but sees a {@code null} return, it should try to recover the generation by falling back to listing the contents
+     *     of the respective shard directory.</li>
+     * </ul>
+     *
+     * @param indexId IndexId
+     * @param shardId Shard Id
+     * @return generation of the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob
+     */
+    @Nullable
+    public String getShardGen(IndexId indexId, int shardId) {
+        final List<String> generations = shardGenerations.get(indexId);
+        if (generations == null || generations.size() < shardId + 1) {
+            return null;
+        }
+        return generations.get(shardId);
+    }
+
+    public List<String> getGens(IndexId indexId) {
+        final List<String> existing = shardGenerations.get(indexId);
+        return existing == null ? Collections.emptyList() : Collections.unmodifiableList(existing);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final ShardGenerations that = (ShardGenerations) o;
+        return shardGenerations.equals(that.shardGenerations);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(shardGenerations);
+    }
+
+    @Override
+    public String toString() {
+        return "ShardGenerations{generations:" + this.shardGenerations + "}";
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static final class Builder {
+
+        private final Map<IndexId, Map<Integer, String>> generations = new HashMap<>();
+
+        /**
+         * Filters out all generations that don't belong to any of the supplied {@code indices} and prunes all {@link #DELETED_SHARD_GEN}
+         * entries from the builder.
+         *
+         * @param indices indices to filter for
+         * @return builder that contains only the given {@code indices} and no {@link #DELETED_SHARD_GEN} entries
+         */
+        public Builder retainIndicesAndPruneDeletes(Set<IndexId> indices) {
+            generations.keySet().retainAll(indices);
+            for (IndexId index : indices) {
+                final Map<Integer, String> shards = generations.getOrDefault(index, Collections.emptyMap());
+                final Iterator<Map.Entry<Integer, String>> iterator = shards.entrySet().iterator();
+                while (iterator.hasNext()) {
+                    Map.Entry<Integer, String> entry = iterator.next();
+                    final String generation = entry.getValue();
+                    if (generation.equals(DELETED_SHARD_GEN)) {
+                        iterator.remove();
+                    }
+                }
+                if (shards.isEmpty()) {
+                    generations.remove(index);
+                }
+            }
+            return this;
+        }
+
+        public Builder putAll(ShardGenerations shardGenerations) {
+            shardGenerations.shardGenerations.forEach((indexId, gens) -> {
+                for (int i = 0; i < gens.size(); i++) {
+                    final String gen = gens.get(i);
+                    if (gen != null) {
+                        put(indexId, i, gens.get(i));
+                    }
+                }
+            });
+            return this;
+        }
+
+        public Builder put(IndexId indexId, int shardId, String generation) {
+            generations.computeIfAbsent(indexId, i -> new HashMap<>()).put(shardId, generation);
+            return this;
+        }
+
+        public ShardGenerations build() {
+            return new ShardGenerations(generations.entrySet().stream().collect(Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> {
+                    final Set<Integer> shardIds = entry.getValue().keySet();
+                    assert shardIds.isEmpty() == false;
+                    final int size = shardIds.stream().mapToInt(i -> i).max().getAsInt() + 1;
+                    // Create a list that can hold the highest shard id as index and leave null values for shards that don't have
+                    // a map entry.
+                    final String[] gens = new String[size];
+                    entry.getValue().forEach((shardId, generation) -> gens[shardId] = generation);
+                    return Arrays.asList(gens);
+                }
+            )));
+        }
+    }
+}

+ 288 - 158
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -91,11 +91,13 @@ import org.elasticsearch.repositories.RepositoryCleanupResult;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.RepositoryVerificationException;
+import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.snapshots.SnapshotException;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.snapshots.SnapshotMissingException;
 import org.elasticsearch.snapshots.SnapshotShardFailure;
+import org.elasticsearch.snapshots.SnapshotsService;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.FilterInputStream;
@@ -359,7 +361,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     }
 
     @Override
-    public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
+    public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
         if (isReadOnly()) {
             listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
         } else {
@@ -369,7 +371,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 // Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
                 // delete an index that was created by another master node after writing this index-N blob.
                 final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
-                doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, listener);
+                doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, writeShardGens, listener);
             } catch (Exception ex) {
                 listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex));
             }
@@ -390,47 +392,170 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
      * @param listener          Listener to invoke once finished
      */
     private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateId, Map<String, BlobContainer> foundIndices,
-                                        Map<String, BlobMetaData> rootBlobs, RepositoryData repositoryData,
+                                        Map<String, BlobMetaData> rootBlobs, RepositoryData repositoryData, boolean writeShardGens,
                                         ActionListener<Void> listener) throws IOException {
-        final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
-        writeIndexGen(updatedRepositoryData, repositoryStateId);
-        final ActionListener<Void> afterCleanupsListener =
-            new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
-
-        // Run unreferenced blobs cleanup in parallel to snapshot deletion
-        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(afterCleanupsListener,
-            l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepositoryData, ActionListener.map(l, ignored -> null))));
-
-        deleteIndices(
-            updatedRepositoryData,
-            repositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotId),
-            snapshotId,
-            ActionListener.runAfter(
-                ActionListener.wrap(
-                    deleteResults -> {
-                        // Now that all metadata (RepositoryData at the repo root as well as index-N blobs in all shard paths)
-                        // has been updated we can execute the delete operations for all blobs that have become unreferenced as a result
-                        final String basePath = basePath().buildAsString();
-                        final int basePathLen = basePath.length();
-                        blobContainer().deleteBlobsIgnoringIfNotExists(
-                            Stream.concat(
-                                deleteResults.stream().flatMap(shardResult -> {
-                                    final String shardPath =
-                                        shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString();
-                                    return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
-                                }),
-                                deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().map(indexId ->
-                                    indexContainer(indexId).path().buildAsString() + globalMetaDataFormat.blobName(snapshotId.getUUID()))
-                            ).map(absolutePath -> {
-                                assert absolutePath.startsWith(basePath);
-                                return absolutePath.substring(basePathLen);
-                            }).collect(Collectors.toList()));
-                    },
-                    // Any exceptions after we have updated the root level RepositoryData are only logged but won't fail the delete request
-                    e -> logger.warn(
-                        () -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e)),
-                () -> afterCleanupsListener.onResponse(null))
-        );
+
+        if (writeShardGens) {
+            // First write the new shard state metadata (with the removed snapshot) and compute deletion targets
+            final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep = new StepListener<>();
+            writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, true, writeShardMetaDataAndComputeDeletesStep);
+            // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows:
+            // 1. Remove the snapshot from the list of existing snapshots
+            // 2. Update the index shard generations of all updated shard folders
+            //
+            // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created
+            //       index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only
+            //       written if all shard paths have been successfully updated.
+            final StepListener<RepositoryData> writeUpdatedRepoDataStep = new StepListener<>();
+            writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> {
+                    final ShardGenerations.Builder builder = ShardGenerations.builder();
+                    for (ShardSnapshotMetaDeleteResult newGen : deleteResults) {
+                        builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
+                    }
+                    final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());
+                    writeIndexGen(updatedRepoData, repositoryStateId, true);
+                    writeUpdatedRepoDataStep.onResponse(updatedRepoData);
+                }, listener::onFailure);
+            // Once we have updated the repository, run the clean-ups
+            writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> {
+                // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
+                final ActionListener<Void> afterCleanupsListener =
+                    new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
+                asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
+                asyncCleanupUnlinkedShardLevelBlobs(snapshotId, writeShardMetaDataAndComputeDeletesStep.result(), afterCleanupsListener);
+            }, listener::onFailure);
+        } else {
+            // Write the new repository data first (with the removed snapshot), using no shard generations
+            final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY);
+            writeIndexGen(updatedRepoData, repositoryStateId, false);
+            // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
+            final ActionListener<Void> afterCleanupsListener =
+                new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
+            asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
+            final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
+            writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, writeMetaAndComputeDeletesStep);
+            writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
+                asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener), afterCleanupsListener::onFailure);
+        }
+    }
+
+    private void asyncCleanupUnlinkedRootAndIndicesBlobs(Map<String, BlobContainer> foundIndices, Map<String, BlobMetaData> rootBlobs,
+                                                         RepositoryData updatedRepoData, ActionListener<Void> listener) {
+        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
+            listener,
+            l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null))));
+    }
+
+    private void asyncCleanupUnlinkedShardLevelBlobs(SnapshotId snapshotId, Collection<ShardSnapshotMetaDeleteResult> deleteResults,
+                                                     ActionListener<Void> listener) {
+        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
+            listener,
+            l -> {
+                try {
+                    blobContainer().deleteBlobsIgnoringIfNotExists(resolveFilesToDelete(snapshotId, deleteResults));
+                    l.onResponse(null);
+                } catch (Exception e) {
+                    logger.warn(
+                        () -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId),
+                        e);
+                    throw e;
+                }
+            }));
+    }
+
+    // updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.
+    private void writeUpdatedShardMetaDataAndComputeDeletes(SnapshotId snapshotId, RepositoryData oldRepositoryData,
+            boolean useUUIDs, ActionListener<Collection<ShardSnapshotMetaDeleteResult>> onAllShardsCompleted) {
+
+        final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
+        final List<IndexId> indices = oldRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotId);
+
+        if (indices.isEmpty()) {
+            onAllShardsCompleted.onResponse(Collections.emptyList());
+            return;
+        }
+
+        // Listener that flattens out the delete results for each index
+        final ActionListener<Collection<ShardSnapshotMetaDeleteResult>> deleteIndexMetaDataListener = new GroupedActionListener<>(
+            ActionListener.map(onAllShardsCompleted,
+                res -> res.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size());
+
+        for (IndexId indexId : indices) {
+            final Set<SnapshotId> survivingSnapshots = oldRepositoryData.getSnapshots(indexId).stream()
+                .filter(id -> id.equals(snapshotId) == false).collect(Collectors.toSet());
+            executor.execute(ActionRunnable.wrap(deleteIndexMetaDataListener, deleteIdxMetaListener -> {
+                final IndexMetaData indexMetaData;
+                try {
+                    indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId);
+                } catch (Exception ex) {
+                    logger.warn(() ->
+                        new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex);
+                    // Just invoke the listener without any shard generations to count it down, this index will be cleaned up
+                    // by the stale data cleanup in the end.
+                    // TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just ignoring
+                    //       it and letting the cleanup deal with it.
+                    deleteIdxMetaListener.onResponse(null);
+                    return;
+                }
+                final int shardCount = indexMetaData.getNumberOfShards();
+                assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]";
+                // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
+                final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener =
+                    new GroupedActionListener<>(deleteIdxMetaListener, shardCount);
+                final Index index = indexMetaData.getIndex();
+                for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
+                    final ShardId shard = new ShardId(index, shardId);
+                    executor.execute(new AbstractRunnable() {
+                        @Override
+                        protected void doRun() throws Exception {
+                            final BlobContainer shardContainer = shardContainer(indexId, shard);
+                            final Set<String> blobs = getShardBlobs(shard, shardContainer);
+                            final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots;
+                            final String newGen;
+                            if (useUUIDs) {
+                                newGen = UUIDs.randomBase64UUID();
+                                blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(blobs, shardContainer,
+                                    oldRepositoryData.shardGenerations().getShardGen(indexId, shard.getId())).v1();
+                            } else {
+                                Tuple<BlobStoreIndexShardSnapshots, Long> tuple =
+                                    buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
+                                newGen = Long.toString(tuple.v2() + 1);
+                                blobStoreIndexShardSnapshots = tuple.v1();
+                            }
+                            allShardsListener.onResponse(deleteFromShardSnapshotMeta(survivingSnapshots, indexId, shard, snapshotId,
+                                shardContainer, blobs, blobStoreIndexShardSnapshots, newGen));
+                        }
+
+                        @Override
+                        public void onFailure(Exception ex) {
+                            logger.warn(
+                                () -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
+                                    snapshotId, indexId.getName(), shard.id()), ex);
+                            // Just passing null here to count down the listener instead of failing it, the stale data left behind
+                            // here will be retried in the next delete or repository cleanup
+                            allShardsListener.onResponse(null);
+                        }
+                    });
+                }
+            }));
+        }
+    }
+
+    private List<String> resolveFilesToDelete(SnapshotId snapshotId, Collection<ShardSnapshotMetaDeleteResult> deleteResults) {
+        final String basePath = basePath().buildAsString();
+        final int basePathLen = basePath.length();
+        return Stream.concat(
+            deleteResults.stream().flatMap(shardResult -> {
+                final String shardPath =
+                    shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString();
+                return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
+            }),
+            deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().map(indexId ->
+                indexContainer(indexId).path().buildAsString() + globalMetaDataFormat.blobName(snapshotId.getUUID()))
+        ).map(absolutePath -> {
+            assert absolutePath.startsWith(basePath);
+            return absolutePath.substring(basePathLen);
+        }).collect(Collectors.toList());
     }
 
     /**
@@ -472,9 +597,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
      *     <li>Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}</li>
      * </ul>
      * @param repositoryStateId Current repository state id
+     * @param writeShardGens    If shard generations should be written to the repository
      * @param listener          Listener to complete when done
      */
-    public void cleanup(long repositoryStateId, ActionListener<RepositoryCleanupResult> listener) {
+    public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListener<RepositoryCleanupResult> listener) {
         try {
             if (isReadOnly()) {
                 throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
@@ -496,7 +622,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO));
             } else {
                 // write new index-N blob to ensure concurrent operations will fail
-                writeIndexGen(repositoryData, repositoryStateId);
+                writeIndexGen(repositoryData, repositoryStateId, writeShardGens);
                 cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new));
             }
         } catch (Exception e) {
@@ -580,71 +706,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         return deleteResult;
     }
 
-    /**
-     * @param repositoryData RepositoryData with the snapshot removed
-     * @param indices        Indices to remove the snapshot from (should not contain indices that become completely unreferenced with the
-     *                       removal of this snapshot as those are cleaned up afterwards by {@link #cleanupStaleBlobs})
-     * @param snapshotId     SnapshotId to remove from all the given indices
-     * @param listener       Listener to invoke when finished
-     */
-    private void deleteIndices(RepositoryData repositoryData, List<IndexId> indices, SnapshotId snapshotId,
-                               ActionListener<Collection<ShardSnapshotMetaDeleteResult>> listener) {
-
-        if (indices.isEmpty()) {
-            listener.onResponse(Collections.emptyList());
-            return;
-        }
-
-        // Listener that flattens out the delete results for each index
-        final ActionListener<Collection<ShardSnapshotMetaDeleteResult>> deleteIndexMetaDataListener = new GroupedActionListener<>(
-            ActionListener.map(listener, res -> res.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size());
-        final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
-        for (IndexId indexId : indices) {
-            executor.execute(ActionRunnable.wrap(deleteIndexMetaDataListener,
-                deleteIdxMetaListener -> {
-                    final IndexMetaData indexMetaData;
-                    try {
-                        indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId);
-                    } catch (Exception ex) {
-                        logger.warn(() ->
-                            new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex);
-                        // Just invoke the listener without any shard generations to count it down, this index will be cleaned up
-                        // by the stale data cleanup in the end.
-                        deleteIdxMetaListener.onResponse(null);
-                        return;
-                    }
-                    final int shardCount = indexMetaData.getNumberOfShards();
-                    assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]";
-                    // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
-                    final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener =
-                        new GroupedActionListener<>(deleteIdxMetaListener, shardCount);
-                    final Index index = indexMetaData.getIndex();
-                    for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
-                        final ShardId shard = new ShardId(index, shardId);
-                        executor.execute(new AbstractRunnable() {
-                            @Override
-                            protected void doRun() throws Exception {
-                                allShardsListener.onResponse(
-                                    deleteShardSnapshot(repositoryData, indexId, shard, snapshotId));
-                            }
-
-                            @Override
-                            public void onFailure(Exception ex) {
-                                logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
-                                    snapshotId, indexId.getName(), shard.id()), ex);
-                                // Just passing null here to count down the listener instead of failing it, the stale data left behind
-                                // here will be retried in the next delete or repository cleanup
-                                allShardsListener.onResponse(null);
-                            }
-                        });
-                    }
-                }));
-        }
-    }
-
     @Override
     public void finalizeSnapshot(final SnapshotId snapshotId,
-                                 final List<IndexId> indices,
+                                 final ShardGenerations shardGenerations,
                                  final long startTime,
                                  final String failure,
                                  final int totalShards,
@@ -653,15 +717,25 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                                  final boolean includeGlobalState,
                                  final MetaData clusterMetaData,
                                  final Map<String, Object> userMetadata,
+                                 boolean writeShardGens,
                                  final ActionListener<SnapshotInfo> listener) {
 
-        // We upload one meta blob for each index, one for the cluster-state and one snap-${uuid}.dat blob
-        // Once we're done writing all metadata, we update the index-N blob to finalize the snapshot
+        final Collection<IndexId> indices = shardGenerations.indices();
+        // Once we are done writing the updated index-N blob we remove the now unreferenced index-${uuid} blobs in each shard
+        // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION
+        // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened
+        // when writing the index-${N} to each shard directory.
         final ActionListener<SnapshotInfo> allMetaListener = new GroupedActionListener<>(
             ActionListener.wrap(snapshotInfos -> {
                     assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
                     final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
-                    writeIndexGen(getRepositoryData().addSnapshot(snapshotId, snapshotInfo.state(), indices), repositoryStateId);
+                    final RepositoryData existingRepositoryData = getRepositoryData();
+                    final RepositoryData updatedRepositoryData =
+                        existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations);
+                    writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens);
+                    if (writeShardGens) {
+                        cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
+                    }
                     listener.onResponse(snapshotInfo);
                 },
                 e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e))),
@@ -694,6 +768,20 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         }));
     }
 
+    // Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data
+    private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) {
+        final List<String> toDelete = new ArrayList<>();
+        final int prefixPathLen = basePath().buildAsString().length();
+        updatedRepositoryData.shardGenerations().obsoleteShardGenerations(existingRepositoryData.shardGenerations()).forEach(
+            (indexId, gens) -> gens.forEach((shardId, oldGen) -> toDelete.add(
+                shardContainer(indexId, shardId).path().buildAsString().substring(prefixPathLen) + INDEX_FILE_PREFIX + oldGen)));
+        try {
+            blobContainer().deleteBlobsIgnoringIfNotExists(toDelete);
+        } catch (Exception e) {
+            logger.warn("Failed to clean up old shard generation blobs", e);
+        }
+    }
+
     @Override
     public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
         try {
@@ -854,7 +942,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         return readOnly;
     }
 
-    protected void writeIndexGen(final RepositoryData repositoryData, final long expectedGen) throws IOException {
+    protected void writeIndexGen(final RepositoryData repositoryData, final long expectedGen,
+                                 final boolean writeShardGens) throws IOException {
         assert isReadOnly() == false; // can not write to a read only repository
         final long currentGen = repositoryData.getGenId();
         if (currentGen != expectedGen) {
@@ -868,7 +957,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         // write the index file
         final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
         logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
-        writeAtomic(indexBlob, BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder())), true);
+        writeAtomic(indexBlob,
+            BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
         // write the current generation to the index-latest file
         final BytesReference genBytes;
         try (BytesStreamOutput bStream = new BytesStreamOutput()) {
@@ -956,7 +1046,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     @Override
     public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
+                              ActionListener<String> listener) {
         final ShardId shardId = store.shardId();
         final long startTime = threadPool.absoluteTimeInMillis();
         final ActionListener<String> snapshotDoneListener = ActionListener.wrap(listener::onResponse, e -> {
@@ -964,19 +1055,23 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             listener.onFailure(e instanceof IndexShardSnapshotFailedException ? e : new IndexShardSnapshotFailedException(shardId, e));
         });
         try {
-            logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name());
-
+            final String generation = snapshotStatus.generation();
+            logger.debug("[{}] [{}] snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
             final BlobContainer shardContainer = shardContainer(indexId, shardId);
             final Set<String> blobs;
-            try {
-                blobs = shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet();
-            } catch (IOException e) {
-                throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
+            if (generation == null) {
+                try {
+                    blobs = shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet();
+                } catch (IOException e) {
+                    throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
+                }
+            } else {
+                blobs = Collections.singleton(INDEX_FILE_PREFIX + generation);
             }
 
-            Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
+            Tuple<BlobStoreIndexShardSnapshots, String> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer, generation);
             BlobStoreIndexShardSnapshots snapshots = tuple.v1();
-            long fileListGeneration = tuple.v2();
+            String fileListGeneration = tuple.v2();
 
             if (snapshots.snapshots().stream().anyMatch(sf -> sf.snapshot().equals(snapshotId.getName()))) {
                 throw new IndexShardSnapshotFailedException(shardId,
@@ -1074,27 +1169,34 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 for (SnapshotFiles point : snapshots) {
                     newSnapshotsList.add(point);
                 }
-                final String indexGeneration = Long.toString(fileListGeneration + 1);
                 final List<String> blobsToDelete;
-                try {
-                    final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
-                    indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
+                final String indexGeneration;
+                if (writeShardGens) {
+                    indexGeneration = UUIDs.randomBase64UUID();
+                    blobsToDelete = Collections.emptyList();
+                } else {
+                    indexGeneration = Long.toString(Long.parseLong(fileListGeneration) + 1);
                     // Delete all previous index-N blobs
                     blobsToDelete = blobs.stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
                     assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, "")))
                         .max().orElse(-1L) < Long.parseLong(indexGeneration)
                         : "Tried to delete an index-N blob newer than the current generation [" + indexGeneration
                         + "] when deleting index-N blobs " + blobsToDelete;
+                }
+                try {
+                    writeShardIndexBlob(shardContainer, indexGeneration, new BlobStoreIndexShardSnapshots(newSnapshotsList));
                 } catch (IOException e) {
                     throw new IndexShardSnapshotFailedException(shardId,
                         "Failed to finalize snapshot creation [" + snapshotId + "] with shard index ["
                             + indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e);
                 }
-                try {
-                    shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
-                } catch (IOException e) {
-                    logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization",
-                        snapshotId, shardId), e);
+                if (writeShardGens == false) {
+                    try {
+                        shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
+                    } catch (IOException e) {
+                        logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization",
+                            snapshotId, shardId), e);
+                    }
                 }
                 snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), indexGeneration);
                 snapshotDoneListener.onResponse(indexGeneration);
@@ -1251,45 +1353,30 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     }
 
     /**
-     * Delete shard snapshot
+     * Delete snapshot from shard level metadata.
      */
-    private ShardSnapshotMetaDeleteResult deleteShardSnapshot(RepositoryData repositoryData, IndexId indexId, ShardId snapshotShardId,
-                                                              SnapshotId snapshotId) throws IOException {
-        final BlobContainer shardContainer = shardContainer(indexId, snapshotShardId);
-        final Set<String> blobs;
-        try {
-            blobs = shardContainer.listBlobs().keySet();
-        } catch (IOException e) {
-            throw new IndexShardSnapshotException(snapshotShardId, "Failed to list content of shard directory", e);
-        }
-
-        Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
-        BlobStoreIndexShardSnapshots snapshots = tuple.v1();
-        long fileListGeneration = tuple.v2();
-
+    private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta(Set<SnapshotId> survivingSnapshots, IndexId indexId,
+                                                                      ShardId snapshotShardId, SnapshotId snapshotId,
+                                                                      BlobContainer shardContainer, Set<String> blobs,
+                                                                      BlobStoreIndexShardSnapshots snapshots, String indexGeneration) {
         // Build a list of snapshots that should be preserved
         List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
-        final Set<String> survivingSnapshotNames =
-            repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getName).collect(Collectors.toSet());
+        final Set<String> survivingSnapshotNames = survivingSnapshots.stream().map(SnapshotId::getName).collect(Collectors.toSet());
         for (SnapshotFiles point : snapshots) {
             if (survivingSnapshotNames.contains(point.snapshot())) {
                 newSnapshotsList.add(point);
             }
         }
-        final String indexGeneration = Long.toString(fileListGeneration + 1);
         try {
-            final List<String> blobsToDelete;
             if (newSnapshotsList.isEmpty()) {
-                // If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found
-                blobsToDelete = List.copyOf(blobs);
+                return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), ShardGenerations.DELETED_SHARD_GEN, blobs);
             } else {
-                final Set<String> survivingSnapshotUUIDs = repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getUUID)
-                    .collect(Collectors.toSet());
                 final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
-                indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
-                blobsToDelete = unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots);
+                writeShardIndexBlob(shardContainer, indexGeneration, updatedSnapshots);
+                final Set<String> survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
+                return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), indexGeneration,
+                    unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots));
             }
-            return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), blobsToDelete);
         } catch (IOException e) {
             throw new IndexShardSnapshotFailedException(snapshotShardId,
                 "Failed to finalize snapshot deletion [" + snapshotId + "] with shard index ["
@@ -1297,6 +1384,23 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         }
     }
 
+    private void writeShardIndexBlob(BlobContainer shardContainer, String indexGeneration,
+                                     BlobStoreIndexShardSnapshots updatedSnapshots) throws IOException {
+        assert ShardGenerations.NEW_SHARD_GEN.equals(indexGeneration) == false;
+        assert ShardGenerations.DELETED_SHARD_GEN.equals(indexGeneration) == false;
+        indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
+    }
+
+    private static Set<String> getShardBlobs(final ShardId snapshotShardId, final BlobContainer shardContainer) {
+        final Set<String> blobs;
+        try {
+            blobs = shardContainer.listBlobs().keySet();
+        } catch (IOException e) {
+            throw new IndexShardSnapshotException(snapshotShardId, "Failed to list content of shard directory", e);
+        }
+        return blobs;
+    }
+
     // Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all
     // temporary blobs
     private static List<String> unusedBlobs(Set<String> blobs, Set<String> survivingSnapshotUUIDs,
@@ -1310,7 +1414,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 || FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
     }
 
-
     /**
      * Loads information about shard snapshot
      */
@@ -1325,6 +1428,29 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         }
     }
 
+    /**
+     * Loads all available snapshots in the repository using the given {@code generation} or falling back to trying to determine it from
+     * the given list of blobs in the shard container.
+     *
+     * @param blobs      list of blobs in repository
+     * @param generation shard generation or {@code null} in case there was no shard generation tracked in the {@link RepositoryData} for
+     *                   this shard because its snapshot was created in a version older than
+     *                   {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
+     * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation
+     */
+    private Tuple<BlobStoreIndexShardSnapshots, String> buildBlobStoreIndexShardSnapshots(Set<String> blobs,
+                                                                                          BlobContainer shardContainer,
+                                                                                          @Nullable String generation) throws IOException {
+        if (generation != null) {
+            if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) {
+                return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN);
+            }
+            return new Tuple<>(indexShardSnapshotsFormat.read(shardContainer, generation), generation);
+        }
+        final Tuple<BlobStoreIndexShardSnapshots, Long> legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
+        return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2()));
+    }
+
     /**
      * Loads all available snapshots in the repository
      *
@@ -1413,12 +1539,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         // Shard id that the snapshot was removed from
         private final int shardId;
 
+        // Id of the new index-${uuid} blob that does not include the snapshot any more
+        private final String newGeneration;
+
         // Blob names in the shard directory that have become unreferenced in the new shard generation
         private final Collection<String> blobsToDelete;
 
-        ShardSnapshotMetaDeleteResult(IndexId indexId, int shardId, Collection<String> blobsToDelete) {
+        ShardSnapshotMetaDeleteResult(IndexId indexId, int shardId, String newGeneration, Collection<String> blobsToDelete) {
             this.indexId = indexId;
             this.shardId = shardId;
+            this.newGeneration = newGeneration;
             this.blobsToDelete = blobsToDelete;
         }
     }

+ 19 - 15
server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java

@@ -70,10 +70,12 @@
  *      |  |  |- snap-20131011.dat - SMILE serialized {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot} for
  *      |  |  |                      snapshot "20131011"
  *      |  |  |- index-123         - SMILE serialized {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} for
- *      |  |  |                      the shard
+ *      |  |  |                      the shard (files with numeric suffixes were created by older versions, newer ES versions use a uuid
+ *      |  |  |                      suffix instead)
  *      |  |
  *      |  |- 1/ - data for shard "1" of index "foo"
  *      |  |  |- __1
+ *      |  |  |- index-Zc2SS8ZgR8JvZAHlSMyMXy - SMILE serialized {@code BlobStoreIndexShardSnapshots} for the shard
  *      |  |  .....
  *      |  |
  *      |  |-2/
@@ -132,8 +134,9 @@
  *
  * <ol>
  * <li>Create the {@link org.apache.lucene.index.IndexCommit} for the shard to snapshot.</li>
- * <li>List all blobs in the shard's path. Find the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob
- * with name {@code index-${N}} for the highest possible value of {@code N} in the list to get the information of what segment files are
+ * <li>Get the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob
+ * with name {@code index-${uuid}} with the {@code uuid} generation returned by
+ * {@link org.elasticsearch.repositories.ShardGenerations#getShardGen} to get the information of what segment files are
  * already available in the blobstore.</li>
  * <li>By comparing the files in the {@code IndexCommit} and the available file list from the previous step, determine the segment files
  * that need to be written to the blob store. For each segment that needs to be added to the blob store, generate a unique name by combining
@@ -143,7 +146,7 @@
  * the shard's path and contains a list of all the files referenced by the snapshot as well as some metadata about the snapshot. See the
  * documentation of {@code BlobStoreIndexShardSnapshot} for details on its contents.</li>
  * <li>Once all the segments and the {@code BlobStoreIndexShardSnapshot} blob have been written, an updated
- * {@code BlobStoreIndexShardSnapshots} blob is written to the shard's path with name {@code index-${N+1}}.</li>
+ * {@code BlobStoreIndexShardSnapshots} blob is written to the shard's path with name {@code index-${newUUID}}.</li>
  * </ol>
  *
  * <h3>Finalizing the Snapshot</h3>
@@ -171,11 +174,6 @@
  *
  * <ol>
  * <li>Get the current {@code RepositoryData} from the latest {@code index-N} blob at the repository root.</li>
- * <li>Write an updated {@code RepositoryData} blob with the deleted snapshot removed to key {@code /index-${N+1}} directly under the
- * repository root.</li>
- * <li>Write an updated {@code index.latest} blob containing {@code N + 1}.</li>
- * <li>Delete the global {@code MetaData} blob {@code meta-${snapshot-uuid}.dat} stored directly under the repository root for the snapshot
- * as well as the {@code SnapshotInfo} blob at {@code /snap-${snapshot-uuid}.dat}.</li>
  * <li>For each index referenced by the snapshot:
  * <ol>
  * <li>Delete the snapshot's {@code IndexMetaData} at {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}}.</li>
@@ -184,16 +182,22 @@
  * <li>Remove the {@code BlobStoreIndexShardSnapshot} blob at {@code /indices/${index-snapshot-uuid}/${i}/snap-${snapshot-uuid}.dat}.</li>
  * <li>List all blobs in the shard path {@code /indices/${index-snapshot-uuid}} and build a new {@code BlobStoreIndexShardSnapshots} from
  * the remaining {@code BlobStoreIndexShardSnapshot} blobs in the shard. Afterwards, write it to the next shard generation blob at
- * {@code /indices/${index-snapshot-uuid}/${i}/index-${N+1}} (The shard's generation is determined from the list of {@code index-N} blobs
- * in the shard directory).</li>
- * <li>Delete all segment blobs (identified by having the data blob prefix {@code __}) in the shard directory which are not referenced by
- * the new {@code BlobStoreIndexShardSnapshots} that has been written in the previous step.</li>
+ * {@code /indices/${index-snapshot-uuid}/${i}/index-${uuid}} (The shard's generation is determined from the map of shard generations in
+ * the {@link org.elasticsearch.repositories.RepositoryData} in the root {@code index-${N}} blob of the repository.</li>
+ * <li>Collect all segment blobs (identified by having the data blob prefix {@code __}) in the shard directory which are not referenced by
+ * the new {@code BlobStoreIndexShardSnapshots} that has been written in the previous step as well as the previous index-${uuid}
+ * blob so that it can be deleted at the end of the snapshot delete process.</li>
  * </ol>
  * </li>
+ * <li>Write an updated {@code RepositoryData} blob with the deleted snapshot removed to key {@code /index-${N+1}} directly under the
+ * repository root and the repository generations that were changed in the affected shards adjusted.</li>
+ * <li>Write an updated {@code index.latest} blob containing {@code N + 1}.</li>
+ * <li>Delete the global {@code MetaData} blob {@code meta-${snapshot-uuid}.dat} stored directly under the repository root for the snapshot
+ * as well as the {@code SnapshotInfo} blob at {@code /snap-${snapshot-uuid}.dat}.</li>
+ * <li>Delete all unreferenced blobs previously collected when updating the shard directories. Also, remove any index folders or blobs
+ * under the repository root that are not referenced by the new {@code RepositoryData} written in the previous step.</li>
  * </ol>
  * </li>
  * </ol>
- * TODO: The above sequence of actions can lead to leaking files when an index completely goes out of scope. Adjust this documentation once
- *       https://github.com/elastic/elasticsearch/issues/13159 is fixed.
  */
 package org.elasticsearch.repositories.blobstore;

+ 5 - 3
server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

@@ -281,7 +281,9 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
                 final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue();
                 final IndexId indexId = indicesMap.get(shardId.getIndexName());
                 assert indexId != null;
-                snapshot(shardId, snapshot, indexId, snapshotStatus, new ActionListener<>() {
+                assert entry.useShardGenerations() || snapshotStatus.generation() == null :
+                    "Found non-null shard generation [" + snapshotStatus.generation() + "] for snapshot with old-format compatibility";
+                snapshot(shardId, snapshot, indexId, snapshotStatus, entry.useShardGenerations(), new ActionListener<>() {
                     @Override
                     public void onResponse(String newGeneration) {
                         assert newGeneration != null;
@@ -311,7 +313,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
      * @param snapshotStatus snapshot status
      */
     private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId,
-                          final IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
+                          final IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener<String> listener) {
         try {
             final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
             if (indexShard.routingEntry().primary() == false) {
@@ -334,7 +336,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
                 // we flush first to make sure we get the latest writes snapshotted
                 snapshotRef = indexShard.acquireLastIndexCommit(true);
                 repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId,
-                    snapshotRef.getIndexCommit(), snapshotStatus, ActionListener.runBefore(listener, snapshotRef::close));
+                    snapshotRef.getIndexCommit(), snapshotStatus, writeShardGens, ActionListener.runBefore(listener, snapshotRef::close));
             } catch (Exception e) {
                 IOUtils.close(snapshotRef);
                 throw e;

+ 71 - 37
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -71,6 +71,7 @@ import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.RepositoryMissingException;
+import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
@@ -285,15 +286,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                                                         request.indicesOptions(), request.indices()));
                     logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
                     List<IndexId> snapshotIndices = repositoryData.resolveNewIndices(indices);
-                    newSnapshot = new SnapshotsInProgress.Entry(new Snapshot(repositoryName, snapshotId),
-                                                                request.includeGlobalState(),
-                                                                request.partial(),
-                                                                State.INIT,
-                                                                snapshotIndices,
-                                                                threadPool.absoluteTimeInMillis(),
-                                                                repositoryData.getGenId(),
-                                                                null,
-                                                                request.userMetadata());
+                    newSnapshot = new SnapshotsInProgress.Entry(
+                        new Snapshot(repositoryName, snapshotId),
+                        request.includeGlobalState(), request.partial(),
+                        State.INIT,
+                        snapshotIndices,
+                        threadPool.absoluteTimeInMillis(),
+                        repositoryData.getGenId(),
+                        null,
+                        request.userMetadata(),
+                        clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
                     initializingSnapshots.add(newSnapshot.snapshot());
                     snapshots = new SnapshotsInProgress(newSnapshot);
                 } else {
@@ -443,8 +445,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                                 hadAbortedInitializations = true;
                             } else {
                                 // Replace the snapshot that was just initialized
-                                ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards =
-                                    shards(currentState, entry.indices());
+                                ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(currentState, entry, repositoryData);
                                 if (!partial) {
                                     Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
                                         currentState.metaData());
@@ -556,7 +557,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 if (snapshotCreated) {
                     repositoriesService.repository(snapshot.snapshot().getRepository())
                         .finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
-                            snapshot.indices(),
+                            buildGenerations(snapshot),
                             snapshot.startTime(),
                             ExceptionsHelper.stackTrace(exception),
                             0,
@@ -564,7 +565,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                             snapshot.getRepositoryStateId(),
                             snapshot.includeGlobalState(),
                             metaDataForSnapshot(snapshot, clusterService.state().metaData()),
-                            snapshot.userMetadata(), ActionListener.runAfter(ActionListener.wrap(ignored -> {
+                            snapshot.userMetadata(),
+                            snapshot.useShardGenerations(),
+                            ActionListener.runAfter(ActionListener.wrap(ignored -> {
                             }, inner -> {
                                 inner.addSuppressed(exception);
                                 logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot in repository",
@@ -577,6 +580,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         }
     }
 
+    private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot) {
+        ShardGenerations.Builder builder = ShardGenerations.builder();
+        final Map<String, IndexId> indexLookup = new HashMap<>();
+        snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
+        snapshot.shards().forEach(c -> builder.put(indexLookup.get(c.key.getIndexName()), c.key.id(), c.value.generation()));
+        return builder.build();
+    }
+
     private static MetaData metaDataForSnapshot(SnapshotsInProgress.Entry snapshot, MetaData metaData) {
         if (snapshot.includeGlobalState() == false) {
             // Remove global state from the cluster state
@@ -760,7 +771,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
             assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster";
             SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0);
-            deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId());
+            deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId(),
+                state.nodes().getMinNodeVersion());
         }
     }
 
@@ -1009,7 +1021,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 }
                 repository.finalizeSnapshot(
                     snapshot.getSnapshotId(),
-                    entry.indices(),
+                    buildGenerations(entry),
                     entry.startTime(),
                     failure,
                     entry.shards().size(),
@@ -1017,7 +1029,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     entry.getRepositoryStateId(),
                     entry.includeGlobalState(),
                     metaDataForSnapshot(entry, metaData),
-                    entry.userMetadata(), ActionListener.wrap(snapshotInfo -> {
+                    entry.userMetadata(),
+                    entry.useShardGenerations(),
+                    ActionListener.wrap(snapshotInfo -> {
                         removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
                         logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
                     }, this::onFailure));
@@ -1299,7 +1313,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     ));
                 } else {
                     logger.debug("deleted snapshot is not running - deleting files");
-                    deleteSnapshotFromRepository(snapshot, listener, repositoryStateId);
+                    deleteSnapshotFromRepository(snapshot, listener, repositoryStateId, newState.nodes().getMinNodeVersion());
                 }
             }
         });
@@ -1338,15 +1352,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * @param snapshot   snapshot
      * @param listener   listener
      * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began
+     * @param version minimum ES version the repository should be readable by
      */
-    private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener<Void> listener, long repositoryStateId) {
+    private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener<Void> listener, long repositoryStateId,
+                                              Version version) {
         threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
             Repository repository = repositoriesService.repository(snapshot.getRepository());
-            repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, ActionListener.wrap(v -> {
-                    logger.info("snapshot [{}] deleted", snapshot);
-                    removeSnapshotDeletionFromClusterState(snapshot, null, l);
-                }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)
-            ));
+            repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, version.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION),
+                ActionListener.wrap(v -> {
+                        logger.info("snapshot [{}] deleted", snapshot);
+                        removeSnapshotDeletionFromClusterState(snapshot, null, l);
+                    }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)
+                ));
         }));
     }
 
@@ -1399,38 +1416,59 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * Calculates the list of shards that should be included into the current snapshot
      *
      * @param clusterState cluster state
-     * @param indices      list of indices to be snapshotted
+     * @param snapshot     SnapshotsInProgress Entry
      * @return list of shard to be included into current snapshot
      */
     private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState,
-                                                                                             List<IndexId> indices) {
+                                                                                             SnapshotsInProgress.Entry snapshot,
+                                                                                             RepositoryData repositoryData) {
         ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
         MetaData metaData = clusterState.metaData();
-        for (IndexId index : indices) {
+        final ShardGenerations shardGenerations = repositoryData.shardGenerations();
+        for (IndexId index : snapshot.indices()) {
             final String indexName = index.getName();
+            final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false;
             IndexMetaData indexMetaData = metaData.index(indexName);
             if (indexMetaData == null) {
                 // The index was deleted before we managed to start the snapshot - mark it as missing.
-                builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), missingStatus(null, "missing index"));
+                builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0),
+                    new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "missing index", null));
             } else {
                 IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName);
                 for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
                     ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
+                    final String shardRepoGeneration;
+                    if (snapshot.useShardGenerations()) {
+                        if (isNewIndex) {
+                            assert shardGenerations.getShardGen(index, shardId.getId()) == null
+                                : "Found shard generation for new index [" + index + "]";
+                            shardRepoGeneration = ShardGenerations.NEW_SHARD_GEN;
+                        } else {
+                            shardRepoGeneration = shardGenerations.getShardGen(index, shardId.getId());
+                        }
+                    } else {
+                        shardRepoGeneration = null;
+                    }
                     if (indexRoutingTable != null) {
                         ShardRouting primary = indexRoutingTable.shard(i).primaryShard();
                         if (primary == null || !primary.assignedToNode()) {
-                            builder.put(shardId, missingStatus(null, "primary shard is not allocated"));
+                            builder.put(shardId,
+                                new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "primary shard is not allocated",
+                                    shardRepoGeneration));
                         } else if (primary.relocating() || primary.initializing()) {
                             builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(
-                                primary.currentNodeId(), ShardState.WAITING, null));
+                                primary.currentNodeId(), ShardState.WAITING, shardRepoGeneration));
                         } else if (!primary.started()) {
-                            builder.put(shardId, missingStatus(primary.currentNodeId(), "primary shard hasn't been started yet"));
+                            builder.put(shardId,
+                                new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
+                                    "primary shard hasn't been started yet", shardRepoGeneration));
                         } else {
-                            builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(
-                                primary.currentNodeId(), null));
+                            builder.put(shardId,
+                                new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration));
                         }
                     } else {
-                        builder.put(shardId, missingStatus(null, "missing routing table"));
+                        builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING,
+                            "missing routing table", shardRepoGeneration));
                     }
                 }
             }
@@ -1439,10 +1477,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         return builder.build();
     }
 
-    private static ShardSnapshotStatus missingStatus(@Nullable String nodeId, String reason) {
-        return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, ShardState.MISSING, reason, null);
-    }
-
     /**
      * Returns the indices that are currently being snapshotted (with partial == false) and that are contained in the indices-to-check set.
      */

+ 2 - 1
server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java

@@ -721,7 +721,8 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
                                 Math.abs(randomLong()),
                                 (long) randomIntBetween(0, 1000),
                                 ImmutableOpenMap.of(),
-                                SnapshotInfoTests.randomUserMetadata()));
+                                SnapshotInfoTests.randomUserMetadata(),
+                                randomBoolean()));
                     case 1:
                         return new RestoreInProgress.Builder().add(
                             new RestoreInProgress.Entry(

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java

@@ -66,7 +66,7 @@ public class SnapshotsInProgressTests extends ESTestCase {
         // test no waiting shards in an index
         shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1"));
         Entry entry = new Entry(snapshot, randomBoolean(), randomBoolean(), State.INIT,
-                                indices, System.currentTimeMillis(), randomLong(), shards.build(), SnapshotInfoTests.randomUserMetadata());
+            indices, System.currentTimeMillis(), randomLong(), shards.build(), SnapshotInfoTests.randomUserMetadata(), randomBoolean());
 
         ImmutableOpenMap<String, List<ShardId>> waitingIndices = entry.waitingIndices();
         assertEquals(2, waitingIndices.get(idx1Name).size());

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java

@@ -62,7 +62,7 @@ public class MetaDataDeleteIndexServiceTests extends ESTestCase {
         SnapshotsInProgress snaps = new SnapshotsInProgress(new SnapshotsInProgress.Entry(snapshot, true, false,
                 SnapshotsInProgress.State.INIT, singletonList(new IndexId(index, "doesn't matter")),
                 System.currentTimeMillis(), (long) randomIntBetween(0, 1000), ImmutableOpenMap.of(),
-                SnapshotInfoTests.randomUserMetadata()));
+                SnapshotInfoTests.randomUserMetadata(), randomBoolean()));
         ClusterState state = ClusterState.builder(clusterState(index))
                 .putCustom(SnapshotsInProgress.TYPE, snaps)
                 .build();

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java

@@ -472,7 +472,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
         final SnapshotsInProgress.Entry entry =
             new SnapshotsInProgress.Entry(snapshot, randomBoolean(), false, SnapshotsInProgress.State.INIT,
                 Collections.singletonList(new IndexId(index, index)), randomNonNegativeLong(), randomLong(), shardsBuilder.build(),
-                SnapshotInfoTests.randomUserMetadata());
+                SnapshotInfoTests.randomUserMetadata(), randomBoolean());
         return ClusterState.builder(newState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entry)).build();
     }
 

+ 5 - 4
server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

@@ -154,15 +154,15 @@ public class RepositoriesServiceTests extends ESTestCase {
         }
 
         @Override
-        public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
+        public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, long startTime, String failure,
                                      int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
                                      boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
-                                     ActionListener<SnapshotInfo> listener) {
+                                     boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
             listener.onResponse(null);
         }
 
         @Override
-        public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
+        public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
             listener.onResponse(null);
         }
 
@@ -198,7 +198,8 @@ public class RepositoriesServiceTests extends ESTestCase {
 
         @Override
         public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit
-            snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
+                                  snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
+                                  ActionListener<String> listener) {
 
         }
 

+ 28 - 11
server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java

@@ -72,7 +72,7 @@ public class RepositoryDataTests extends ESTestCase {
     public void testXContent() throws IOException {
         RepositoryData repositoryData = generateRandomRepoData();
         XContentBuilder builder = JsonXContent.contentBuilder();
-        repositoryData.snapshotsToXContent(builder);
+        repositoryData.snapshotsToXContent(builder, true);
         try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
             long gen = (long) randomIntBetween(0, 500);
             RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen);
@@ -90,18 +90,22 @@ public class RepositoryDataTests extends ESTestCase {
         List<IndexId> indices = new ArrayList<>();
         Set<IndexId> newIndices = new HashSet<>();
         int numNew = randomIntBetween(1, 10);
+        final ShardGenerations.Builder builder = ShardGenerations.builder();
         for (int i = 0; i < numNew; i++) {
             IndexId indexId = new IndexId(randomAlphaOfLength(7), UUIDs.randomBase64UUID());
             newIndices.add(indexId);
             indices.add(indexId);
+            builder.put(indexId, 0, "1");
         }
         int numOld = randomIntBetween(1, indexIdMap.size());
         List<String> indexNames = new ArrayList<>(indexIdMap.keySet());
         for (int i = 0; i < numOld; i++) {
-            indices.add(indexIdMap.get(indexNames.get(i)));
+            final IndexId indexId = indexIdMap.get(indexNames.get(i));
+            indices.add(indexId);
+            builder.put(indexId, 0, "2");
         }
         RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot,
-            randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), indices);
+            randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), builder.build());
         // verify that the new repository data has the new snapshot and its indices
         assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot));
         for (IndexId indexId : indices) {
@@ -124,10 +128,11 @@ public class RepositoryDataTests extends ESTestCase {
             snapshotStates.put(snapshotId.getUUID(), randomFrom(SnapshotState.values()));
         }
         RepositoryData repositoryData = new RepositoryData(EMPTY_REPO_GEN, snapshotIds,
-            Collections.emptyMap(), Collections.emptyMap());
+            Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
         // test that initializing indices works
         Map<IndexId, Set<SnapshotId>> indices = randomIndices(snapshotIds);
-        RepositoryData newRepoData =  new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, indices);
+        RepositoryData newRepoData =
+            new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, indices, ShardGenerations.EMPTY);
         List<SnapshotId> expected = new ArrayList<>(repositoryData.getSnapshotIds());
         Collections.sort(expected);
         List<SnapshotId> actual = new ArrayList<>(newRepoData.getSnapshotIds());
@@ -143,7 +148,7 @@ public class RepositoryDataTests extends ESTestCase {
         List<SnapshotId> snapshotIds = new ArrayList<>(repositoryData.getSnapshotIds());
         assertThat(snapshotIds.size(), greaterThan(0));
         SnapshotId removedSnapshotId = snapshotIds.remove(randomIntBetween(0, snapshotIds.size() - 1));
-        RepositoryData newRepositoryData = repositoryData.removeSnapshot(removedSnapshotId);
+        RepositoryData newRepositoryData = repositoryData.removeSnapshot(removedSnapshotId, ShardGenerations.EMPTY);
         // make sure the repository data's indices no longer contain the removed snapshot
         for (final IndexId indexId : newRepositoryData.getIndices().values()) {
             assertFalse(newRepositoryData.getSnapshots(indexId).contains(removedSnapshotId));
@@ -163,7 +168,7 @@ public class RepositoryDataTests extends ESTestCase {
     public void testGetSnapshotState() {
         final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID());
         final SnapshotState state = randomFrom(SnapshotState.values());
-        final RepositoryData repositoryData = RepositoryData.EMPTY.addSnapshot(snapshotId, state, Collections.emptyList());
+        final RepositoryData repositoryData = RepositoryData.EMPTY.addSnapshot(snapshotId, state, ShardGenerations.EMPTY);
         assertEquals(state, repositoryData.getSnapshotState(snapshotId));
         assertNull(repositoryData.getSnapshotState(new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID())));
     }
@@ -173,7 +178,7 @@ public class RepositoryDataTests extends ESTestCase {
         final RepositoryData repositoryData = generateRandomRepoData();
 
         XContentBuilder builder = XContentBuilder.builder(xContent);
-        repositoryData.snapshotsToXContent(builder);
+        repositoryData.snapshotsToXContent(builder, true);
         RepositoryData parsedRepositoryData;
         try (XContentParser xParser = createParser(builder)) {
             parsedRepositoryData = RepositoryData.snapshotsFromXContent(xParser, repositoryData.getGenId());
@@ -190,6 +195,7 @@ public class RepositoryDataTests extends ESTestCase {
         final IndexId corruptedIndexId = randomFrom(parsedRepositoryData.getIndices().values());
 
         Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
+        final ShardGenerations.Builder shardGenBuilder = ShardGenerations.builder();
         for (Map.Entry<String, IndexId> snapshottedIndex : parsedRepositoryData.getIndices().entrySet()) {
             IndexId indexId = snapshottedIndex.getValue();
             Set<SnapshotId> snapshotsIds = new LinkedHashSet<>(parsedRepositoryData.getSnapshots(indexId));
@@ -197,14 +203,18 @@ public class RepositoryDataTests extends ESTestCase {
                 snapshotsIds.add(new SnapshotId("_uuid", "_does_not_exist"));
             }
             indexSnapshots.put(indexId, snapshotsIds);
+            final int shardCount = randomIntBetween(1, 10);
+            for (int i = 0; i < shardCount; ++i) {
+                shardGenBuilder.put(indexId, i, UUIDs.randomBase64UUID(random()));
+            }
         }
         assertNotNull(corruptedIndexId);
 
         RepositoryData corruptedRepositoryData = new RepositoryData(parsedRepositoryData.getGenId(), snapshotIds, snapshotStates,
-            indexSnapshots);
+            indexSnapshots, shardGenBuilder.build());
 
         final XContentBuilder corruptedBuilder = XContentBuilder.builder(xContent);
-        corruptedRepositoryData.snapshotsToXContent(corruptedBuilder);
+        corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, true);
 
         try (XContentParser xParser = createParser(corruptedBuilder)) {
             ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () ->
@@ -262,7 +272,14 @@ public class RepositoryDataTests extends ESTestCase {
         for (int i = 0; i < numSnapshots; i++) {
             final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID());
             final List<IndexId> someIndices = indices.subList(0, randomIntBetween(1, numIndices));
-            repositoryData = repositoryData.addSnapshot(snapshotId, randomFrom(SnapshotState.values()), someIndices);
+            final ShardGenerations.Builder builder = ShardGenerations.builder();
+            for (IndexId someIndex : someIndices) {
+                final int shardCount = randomIntBetween(1, 10);
+                for (int j = 0; j < shardCount; ++j) {
+                    builder.put(someIndex, 0, UUIDs.randomBase64UUID(random()));
+                }
+            }
+            repositoryData = repositoryData.addSnapshot(snapshotId, randomFrom(SnapshotState.values()), builder.build());
         }
         return repositoryData;
     }

+ 12 - 0
server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

@@ -21,6 +21,8 @@ package org.elasticsearch.repositories.blobstore;
 
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.TestUtil;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.ShardRouting;
@@ -41,14 +43,17 @@ import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.store.StoreFileMetaData;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.repositories.fs.FsRepository;
 import org.elasticsearch.snapshots.Snapshot;
 import org.elasticsearch.snapshots.SnapshotId;
+import org.elasticsearch.snapshots.SnapshotInfo;
 
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.hamcrest.Matchers.containsString;
@@ -163,6 +168,13 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
             assertNotNull(shardGen);
             final Snapshot snapshotWithSameName = new Snapshot(repository.getMetadata().name(), new SnapshotId(
                 snapshot.getSnapshotId().getName(), "_uuid2"));
+            final PlainActionFuture<SnapshotInfo> future = PlainActionFuture.newFuture();
+            repository.finalizeSnapshot(snapshot.getSnapshotId(),
+                ShardGenerations.builder().put(indexId, 0, shardGen).build(),
+                0L, null, 1, Collections.emptyList(), -1L, false,
+                MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(), true,
+                future);
+            future.actionGet();
             IndexShardSnapshotFailedException isfe = expectThrows(IndexShardSnapshotFailedException.class,
                 () -> snapshotShard(shard, snapshotWithSameName, repository));
             assertThat(isfe.getMessage(), containsString("Duplicate snapshot name"));

+ 14 - 13
server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java

@@ -34,6 +34,7 @@ import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
+import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.repositories.fs.FsRepository;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotState;
@@ -43,7 +44,6 @@ import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -141,7 +141,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
         // write to and read from a index file with no entries
         assertThat(repository.getRepositoryData().getSnapshotIds().size(), equalTo(0));
         final RepositoryData emptyData = RepositoryData.EMPTY;
-        repository.writeIndexGen(emptyData, emptyData.getGenId());
+        repository.writeIndexGen(emptyData, emptyData.getGenId(), true);
         RepositoryData repoData = repository.getRepositoryData();
         assertEquals(repoData, emptyData);
         assertEquals(repoData.getIndices().size(), 0);
@@ -150,12 +150,12 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
 
         // write to and read from an index file with snapshots but no indices
         repoData = addRandomSnapshotsToRepoData(repoData, false);
-        repository.writeIndexGen(repoData, repoData.getGenId());
+        repository.writeIndexGen(repoData, repoData.getGenId(), true);
         assertEquals(repoData, repository.getRepositoryData());
 
         // write to and read from a index file with random repository data
         repoData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true);
-        repository.writeIndexGen(repoData, repoData.getGenId());
+        repository.writeIndexGen(repoData, repoData.getGenId(), true);
         assertEquals(repoData, repository.getRepositoryData());
     }
 
@@ -164,21 +164,22 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
 
         // write to index generational file
         RepositoryData repositoryData = generateRandomRepoData();
-        repository.writeIndexGen(repositoryData, repositoryData.getGenId());
+        repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
         assertThat(repository.getRepositoryData(), equalTo(repositoryData));
         assertThat(repository.latestIndexBlobId(), equalTo(0L));
         assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L));
 
         // adding more and writing to a new index generational file
         repositoryData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true);
-        repository.writeIndexGen(repositoryData, repositoryData.getGenId());
+        repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
         assertEquals(repository.getRepositoryData(), repositoryData);
         assertThat(repository.latestIndexBlobId(), equalTo(1L));
         assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L));
 
         // removing a snapshot and writing to a new index generational file
-        repositoryData = repository.getRepositoryData().removeSnapshot(repositoryData.getSnapshotIds().iterator().next());
-        repository.writeIndexGen(repositoryData, repositoryData.getGenId());
+        repositoryData = repository.getRepositoryData().removeSnapshot(
+            repositoryData.getSnapshotIds().iterator().next(), ShardGenerations.EMPTY);
+        repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
         assertEquals(repository.getRepositoryData(), repositoryData);
         assertThat(repository.latestIndexBlobId(), equalTo(2L));
         assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L));
@@ -190,12 +191,12 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
         // write to index generational file
         RepositoryData repositoryData = generateRandomRepoData();
         final long startingGeneration = repositoryData.getGenId();
-        repository.writeIndexGen(repositoryData, startingGeneration);
+        repository.writeIndexGen(repositoryData, startingGeneration, true);
 
         // write repo data again to index generational file, errors because we already wrote to the
         // N+1 generation from which this repository data instance was created
         expectThrows(RepositoryException.class, () -> repository.writeIndexGen(
-            repositoryData.withGenId(startingGeneration + 1), repositoryData.getGenId()));
+            repositoryData.withGenId(startingGeneration + 1), repositoryData.getGenId(), true));
     }
 
     public void testBadChunksize() throws Exception {
@@ -242,12 +243,12 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
         for (int i = 0; i < numSnapshots; i++) {
             SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID());
             int numIndices = inclIndices ? randomIntBetween(0, 20) : 0;
-            List<IndexId> indexIds = new ArrayList<>(numIndices);
+            final ShardGenerations.Builder builder = ShardGenerations.builder();
             for (int j = 0; j < numIndices; j++) {
-                indexIds.add(new IndexId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()));
+                builder.put(new IndexId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()), 0, "1");
             }
             repoData = repoData.addSnapshot(snapshotId,
-                randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), indexIds);
+                randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), builder.build());
         }
         return repoData;
     }

+ 2 - 3
server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java

@@ -103,8 +103,7 @@ public class FsRepositoryTests extends ESTestCase {
             final PlainActionFuture<String> future1 = PlainActionFuture.newFuture();
             runGeneric(threadPool, () -> {
                 IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
-                repository.snapshotShard(store, null, snapshotId, indexId, indexCommit,
-                    snapshotStatus, future1);
+                repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, true, future1);
                 future1.actionGet();
                 IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
                 assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
@@ -130,7 +129,7 @@ public class FsRepositoryTests extends ESTestCase {
             final PlainActionFuture<String> future2 = PlainActionFuture.newFuture();
             runGeneric(threadPool, () -> {
                 IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
-                repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, future2);
+                repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, true, future2);
                 future2.actionGet();
                 IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
                 assertEquals(2, copy.getIncrementalFileCount());

+ 96 - 6
server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

@@ -85,6 +85,7 @@ import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
+import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.script.MockScriptEngine;
 import org.elasticsearch.script.StoredScriptsIT;
@@ -1459,7 +1460,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         for (String index : indices) {
             Path shardZero = indicesPath.resolve(indexIds.get(index).getId()).resolve("0");
             if (randomBoolean()) {
-                Files.delete(shardZero.resolve("index-0"));
+                Files.delete(
+                    shardZero.resolve("index-" + getRepositoryData(repository).shardGenerations().getShardGen(indexIds.get(index), 0)));
             }
             Files.delete(shardZero.resolve("snap-" + snapshotInfo.snapshotId().getUUID() + ".dat"));
         }
@@ -1620,6 +1622,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
     }
 
     public void testSnapshotWithMissingShardLevelIndexFile() throws Exception {
+        disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
+
         Path repo = randomRepoPath();
         logger.info("-->  creating repository at {}", repo.toAbsolutePath());
         assertAcked(client().admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings(
@@ -1639,7 +1643,13 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         try (Stream<Path> files = Files.list(repo.resolve("indices"))) {
             files.forEach(indexPath -> {
                 try {
-                    Files.delete(indexPath.resolve("0").resolve("index-0"));
+                    final Path shardGen;
+                    try (Stream<Path> shardFiles = Files.list(indexPath.resolve("0"))) {
+                        shardGen = shardFiles
+                            .filter(file -> file.getFileName().toString().startsWith(BlobStoreRepository.INDEX_FILE_PREFIX))
+                            .findFirst().orElseThrow(() -> new AssertionError("Failed to find shard index blob"));
+                    }
+                    Files.delete(shardGen);
                 } catch (IOException e) {
                     throw new RuntimeException("Failed to delete expected file", e);
                 }
@@ -1650,11 +1660,11 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         CreateSnapshotResponse createSnapshotResponse =
             client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2")
                 .setWaitForCompletion(true).setIndices("test-idx-1").get();
-        assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
-        assertEquals(createSnapshotResponse.getSnapshotInfo().successfulShards(), createSnapshotResponse.getSnapshotInfo().totalShards());
+        assertEquals(createSnapshotResponse.getSnapshotInfo().successfulShards(),
+            createSnapshotResponse.getSnapshotInfo().totalShards() - 1);
 
         logger.info("--> restoring the first snapshot, the repository should not have lost any shard data despite deleting index-N, " +
-                        "because it should have iterated over the snap-*.data files as backup");
+                        "because it uses snap-*.data files and not the index-N to determine what files to restore");
         client().admin().indices().prepareDelete("test-idx-1", "test-idx-2").get();
         RestoreSnapshotResponse restoreSnapshotResponse =
             client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).get();
@@ -3000,7 +3010,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         final IndexId corruptedIndex = indexIds.get(indexName);
         final Path shardIndexFile = repo.resolve("indices")
             .resolve(corruptedIndex.getId()).resolve("0")
-            .resolve("index-0");
+            .resolve("index-" + repositoryData.shardGenerations().getShardGen(corruptedIndex, 0));
 
         logger.info("-->  truncating shard index file [{}]", shardIndexFile);
         try (SeekableByteChannel outChan = Files.newByteChannel(shardIndexFile, StandardOpenOption.WRITE)) {
@@ -3761,6 +3771,86 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         }
     }
 
+    public void testSnapshotDifferentIndicesBySameName() {
+        String indexName = "testindex";
+        String repoName = "test-repo";
+        String absolutePath = randomRepoPath().toAbsolutePath().toString();
+        logger.info("Path [{}]", absolutePath);
+
+        final int initialShardCount = randomIntBetween(1, 10);
+        createIndex(indexName, Settings.builder().put("index.number_of_shards", initialShardCount).build());
+        ensureGreen();
+
+        logger.info("--> indexing some documents");
+        final int docCount = initialShardCount * randomIntBetween(1, 10);
+        for (int i = 0; i < docCount; i++) {
+            index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i);
+        }
+
+        logger.info("-->  creating repository");
+        assertAcked(client().admin().cluster().preparePutRepository(repoName)
+            .setType("fs")
+            .setSettings(Settings.builder().put("location", absolutePath)));
+
+        logger.info("--> snapshot with [{}] shards", initialShardCount);
+        final SnapshotInfo snapshot1 =
+            client().admin().cluster().prepareCreateSnapshot(repoName, "snap-1").setWaitForCompletion(true).get().getSnapshotInfo();
+        assertThat(snapshot1.state(), is(SnapshotState.SUCCESS));
+        assertThat(snapshot1.successfulShards(), is(initialShardCount));
+
+        logger.info("--> delete index");
+        assertAcked(client().admin().indices().prepareDelete(indexName));
+
+        final int newShardCount = randomIntBetween(1, 10);
+        createIndex(indexName, Settings.builder().put("index.number_of_shards", newShardCount).build());
+        ensureGreen();
+
+        logger.info("--> indexing some documents");
+        final int newDocCount = newShardCount * randomIntBetween(1, 10);
+        for (int i = 0; i < newDocCount; i++) {
+            index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i);
+        }
+
+        logger.info("--> snapshot with [{}] shards", newShardCount);
+        final SnapshotInfo snapshot2 =
+            client().admin().cluster().prepareCreateSnapshot(repoName, "snap-2").setWaitForCompletion(true).get().getSnapshotInfo();
+        assertThat(snapshot2.state(), is(SnapshotState.SUCCESS));
+        assertThat(snapshot2.successfulShards(), is(newShardCount));
+
+        logger.info("--> restoring snapshot 1");
+        client().admin().cluster().prepareRestoreSnapshot(repoName, "snap-1").setIndices(indexName).setRenamePattern(indexName)
+            .setRenameReplacement("restored-1").setWaitForCompletion(true).get();
+
+        logger.info("--> restoring snapshot 2");
+        client().admin().cluster().prepareRestoreSnapshot(repoName, "snap-2").setIndices(indexName).setRenamePattern(indexName)
+            .setRenameReplacement("restored-2").setWaitForCompletion(true).get();
+
+        logger.info("--> verify doc counts");
+        assertHitCount(client().prepareSearch("restored-1").setSize(0).get(), docCount);
+        assertHitCount(client().prepareSearch("restored-2").setSize(0).get(), newDocCount);
+
+        final String snapshotToDelete;
+        final String snapshotToRestore;
+        final int expectedCount;
+        if (randomBoolean()) {
+            snapshotToDelete = "snap-1";
+            snapshotToRestore = "snap-2";
+            expectedCount = newDocCount;
+        } else {
+            snapshotToDelete = "snap-2";
+            snapshotToRestore = "snap-1";
+            expectedCount = docCount;
+        }
+        logger.info("--> deleting snapshot [{}]", snapshotToDelete);
+        assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToDelete).get());
+        logger.info("--> restoring snapshot [{}]", snapshotToRestore);
+        client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotToRestore).setIndices(indexName).setRenamePattern(indexName)
+            .setRenameReplacement("restored-3").setWaitForCompletion(true).get();
+
+        logger.info("--> verify doc counts");
+        assertHitCount(client().prepareSearch("restored-3").setSize(0).get(), expectedCount);
+    }
+
     private void verifySnapshotInfo(final String repo, final GetSnapshotsResponse response,
                                     final Map<String, List<String>> indicesPerSnapshot) {
         for (SnapshotInfo snapshotInfo : response.getSnapshots("test-repo")) {

+ 1 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java

@@ -77,7 +77,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
         }
         ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = builder.build();
         return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards,
-            SnapshotInfoTests.randomUserMetadata());
+            SnapshotInfoTests.randomUserMetadata(), randomBoolean());
     }
 
     @Override

+ 8 - 6
server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
@@ -148,8 +149,9 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
             // We create a snap- blob for snapshot "foo" in the first generation
             final PlainActionFuture<SnapshotInfo> future = PlainActionFuture.newFuture();
             final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID());
-            repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(),
-                -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), future);
+            // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
+            repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(),
+                -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), true, future);
             future.actionGet();
 
             // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
@@ -157,8 +159,8 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
                 () -> {
                     final PlainActionFuture<SnapshotInfo> fut = PlainActionFuture.newFuture();
                     repository.finalizeSnapshot(
-                        snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(),
-                        0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), fut);
+                        snapshotId, ShardGenerations.EMPTY, 1L, null, 6, Collections.emptyList(),
+                        0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), true, fut);
                     fut.actionGet();
                 });
             assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n     but: was <5>"));
@@ -166,8 +168,8 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
             // We try to write yet another snap- blob for "foo" in the next generation.
             // It passes cleanly because the content of the blob except for the timestamps.
             final PlainActionFuture<SnapshotInfo> future2 = PlainActionFuture.newFuture();
-            repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(),
-                0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), future2);
+            repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(),
+                0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(),true, future2);
             future2.actionGet();
         }
     }

+ 3 - 2
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -822,12 +822,13 @@ public abstract class IndexShardTestCase extends ESTestCase {
                                    final Repository repository) throws IOException {
         final Index index = shard.shardId().getIndex();
         final IndexId indexId = new IndexId(index.getName(), index.getUUID());
-        final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
+        final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(
+            repository.getRepositoryData().shardGenerations().getShardGen(indexId, shard.shardId().getId()));
         final PlainActionFuture<String> future = PlainActionFuture.newFuture();
         final String shardGen;
         try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
             repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId,
-                indexCommitRef.getIndexCommit(), snapshotStatus, future);
+                indexCommitRef.getIndexCommit(), snapshotStatus, true, future);
             shardGen = future.actionGet();
         }
 

+ 9 - 9
test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

@@ -31,16 +31,15 @@ import org.elasticsearch.index.store.Store;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
+import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.snapshots.SnapshotShardFailure;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static java.util.Collections.emptySet;
 import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
@@ -87,21 +86,21 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
 
     @Override
     public RepositoryData getRepositoryData() {
-        Map<IndexId, Set<SnapshotId>> map = new HashMap<>();
-        map.put(new IndexId(indexName, "blah"), emptySet());
-        return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map);
+        final IndexId indexId = new IndexId(indexName, "blah");
+        return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(),
+            Collections.singletonMap(indexId, emptySet()), ShardGenerations.EMPTY);
     }
 
     @Override
-    public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
+    public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
                                  int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
-                                 boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
+                                 boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata, boolean writeShardGens,
                                  ActionListener<SnapshotInfo> listener) {
         listener.onResponse(null);
     }
 
     @Override
-    public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
+    public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
         listener.onResponse(null);
     }
 
@@ -131,7 +130,8 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
 
     @Override
     public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
+                              ActionListener<String> listener) {
     }
 
     @Override

+ 47 - 4
test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java

@@ -20,6 +20,7 @@ package org.elasticsearch.repositories.blobstore;
 
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetaData;
@@ -33,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.RepositoryData;
+import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.test.InternalTestCluster;
@@ -45,11 +47,12 @@ import java.io.InputStream;
 import java.nio.file.NoSuchFileException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.Locale;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -59,6 +62,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -106,6 +110,7 @@ public final class BlobStoreTestUtil {
             }
             assertIndexUUIDs(blobContainer, repositoryData);
             assertSnapshotUUIDs(repository, repositoryData);
+            assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations());
         }));
         listener.actionGet(TimeValue.timeValueMinutes(1L));
     }
@@ -118,6 +123,27 @@ public final class BlobStoreTestUtil {
         assertTrue(indexGenerations.length <= 2);
     }
 
+    private static void assertShardIndexGenerations(BlobContainer repoRoot, ShardGenerations shardGenerations) throws IOException {
+        final BlobContainer indicesContainer = repoRoot.children().get("indices");
+        for (IndexId index : shardGenerations.indices()) {
+            final List<String> gens = shardGenerations.getGens(index);
+            if (gens.isEmpty() == false) {
+                final BlobContainer indexContainer = indicesContainer.children().get(index.getId());
+                final Map<String, BlobContainer> shardContainers = indexContainer.children();
+                for (int i = 0; i < gens.size(); i++) {
+                    final String generation = gens.get(i);
+                    assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
+                    if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) {
+                        final String shardId = Integer.toString(i);
+                        assertThat(shardContainers, hasKey(shardId));
+                        assertThat(shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX),
+                            hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation));
+                    }
+                }
+            }
+        }
+    }
+
     private static void assertIndexUUIDs(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException {
         final List<String> expectedIndexUUIDs =
             repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList());
@@ -151,6 +177,8 @@ public final class BlobStoreTestUtil {
         } else {
             indices = indicesContainer.children();
         }
+        final Map<IndexId, Integer> maxShardCountsExpected = new HashMap<>();
+        final Map<IndexId, Integer> maxShardCountsSeen = new HashMap<>();
         // Assert that for each snapshot, the relevant metadata was written to index and shard folders
         for (SnapshotId snapshotId: snapshotIds) {
             final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
@@ -160,14 +188,27 @@ public final class BlobStoreTestUtil {
                 final BlobContainer indexContainer = indices.get(indexId.getId());
                 assertThat(indexContainer.listBlobs(),
                     hasKey(String.format(Locale.ROOT, BlobStoreRepository.METADATA_NAME_FORMAT, snapshotId.getUUID())));
+                final IndexMetaData indexMetaData = repository.getSnapshotIndexMetaData(snapshotId, indexId);
                 for (Map.Entry<String, BlobContainer> entry : indexContainer.children().entrySet()) {
                     // Skip Lucene MockFS extraN directory
                     if (entry.getKey().startsWith("extra")) {
                         continue;
                     }
-                    if (snapshotInfo.shardFailures().stream().noneMatch(shardFailure ->
-                        shardFailure.index().equals(index) && shardFailure.shardId() == Integer.parseInt(entry.getKey()))) {
-                        final Map<String, BlobMetaData> shardPathContents = entry.getValue().listBlobs();
+                    final int shardId = Integer.parseInt(entry.getKey());
+                    final int shardCount = indexMetaData.getNumberOfShards();
+                    maxShardCountsExpected.compute(
+                        indexId, (i, existing) -> existing == null || existing < shardCount ? shardCount : existing);
+                    final BlobContainer shardContainer = entry.getValue();
+                    // TODO: we shouldn't be leaking empty shard directories when a shard (but not all of the index it belongs to)
+                    //       becomes unreferenced. We should fix that and remove this conditional once its fixed.
+                    if (shardContainer.listBlobs().keySet().stream().anyMatch(blob -> blob.startsWith("extra") == false)) {
+                        final int impliedCount = shardId - 1;
+                        maxShardCountsSeen.compute(
+                            indexId, (i, existing) -> existing == null || existing < impliedCount ? impliedCount : existing);
+                    }
+                    if (shardId < shardCount && snapshotInfo.shardFailures().stream().noneMatch(
+                        shardFailure -> shardFailure.index().equals(index) && shardFailure.shardId() == shardId)) {
+                        final Map<String, BlobMetaData> shardPathContents = shardContainer.listBlobs();
                         assertThat(shardPathContents,
                             hasKey(String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotId.getUUID())));
                         assertThat(shardPathContents.keySet().stream()
@@ -176,6 +217,8 @@ public final class BlobStoreTestUtil {
                 }
             }
         }
+        maxShardCountsSeen.forEach(((indexId, count) -> assertThat("Found unreferenced shard paths for index [" + indexId + "]",
+            count, lessThanOrEqualTo(maxShardCountsExpected.get(indexId)))));
     }
 
     public static long createDanglingIndex(BlobStoreRepository repository, String name, Set<String> files)

+ 8 - 6
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -58,6 +58,7 @@ import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
+import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.repositories.blobstore.FileRestoreContext;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
@@ -241,19 +242,19 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
             Index index = remoteIndices.get(indexName).getIndex();
             indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singleton(snapshotId));
         }
-
-        return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots);
+        return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots, ShardGenerations.EMPTY);
     }
 
     @Override
-    public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
+    public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
                                  List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
-                                 MetaData metaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener) {
+                                 MetaData metaData, Map<String, Object> userMetadata, boolean writeShardGens,
+                                 ActionListener<SnapshotInfo> listener) {
         throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
     }
 
     @Override
-    public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
+    public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
         throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
     }
 
@@ -288,7 +289,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
 
     @Override
     public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
+                              ActionListener<String> listener) {
         throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
     }
 

+ 12 - 8
x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java

@@ -36,12 +36,14 @@ import org.elasticsearch.index.translog.TranslogStats;
 import org.elasticsearch.repositories.FilterRepository;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.repositories.ShardGenerations;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -78,21 +80,22 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
     }
 
     @Override
-    public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
-                                 List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
-                                 MetaData metaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener) {
+    public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
+                                 int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
+                                 boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
+                                 boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
         // we process the index metadata at snapshot time. This means if somebody tries to restore
         // a _source only snapshot with a plain repository it will be just fine since we already set the
         // required engine, that the index is read-only and the mapping to a default mapping
         try {
-            super.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
-                includeGlobalState, metadataToSnapshot(indices, metaData), userMetadata, listener);
+            super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
+                includeGlobalState, metadataToSnapshot(shardGenerations.indices(), metaData), userMetadata, writeShardGens, listener);
         } catch (IOException ex) {
             listener.onFailure(ex);
         }
     }
 
-    private static MetaData metadataToSnapshot(List<IndexId> indices, MetaData metaData) throws IOException {
+    private static MetaData metadataToSnapshot(Collection<IndexId> indices, MetaData metaData) throws IOException {
         MetaData.Builder builder = MetaData.builder(metaData);
         for (IndexId indexId : indices) {
             IndexMetaData index = metaData.index(indexId.getName());
@@ -121,7 +124,8 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
 
     @Override
     public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
+                              ActionListener<String> listener) {
         if (mapperService.documentMapper() != null // if there is no mapping this is null
             && mapperService.documentMapper().sourceMapper().isComplete() == false) {
             listener.onFailure(
@@ -160,7 +164,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
                 Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY, BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name()));
             toClose.add(reader);
             IndexCommit indexCommit = reader.getIndexCommit();
-            super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus,
+            super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, writeShardGens,
                 ActionListener.runBefore(listener, () -> IOUtils.close(toClose)));
         } catch (IOException e) {
             try {

+ 9 - 6
x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

@@ -60,6 +60,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.repositories.fs.FsRepository;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.hamcrest.Matchers;
@@ -99,7 +100,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1");
             final PlainActionFuture<String> future = PlainActionFuture.newFuture();
             runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
-                snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future));
+                snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future));
             IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet);
             assertEquals(
                 "Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source",
@@ -125,7 +126,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
             SnapshotId snapshotId = new SnapshotId("test", "test");
             final PlainActionFuture<String> future = PlainActionFuture.newFuture();
             runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
-                snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future));
+                snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future));
             shardGeneration = future.actionGet();
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
@@ -141,7 +142,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
             final PlainActionFuture<String> future = PlainActionFuture.newFuture();
             runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
-                snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future));
+                snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future));
             shardGeneration = future.actionGet();
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             // we processed the segments_N file plus _1.si, _1.fdx, _1.fnm, _1.fdt
@@ -157,7 +158,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
             final PlainActionFuture<String> future = PlainActionFuture.newFuture();
             runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
-                snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future));
+                snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future));
             future.actionGet();
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             // we processed the segments_N file plus _1_1.liv
@@ -205,13 +206,15 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
             final PlainActionFuture<String> future = PlainActionFuture.newFuture();
             runAsSnapshot(shard.getThreadPool(), () -> {
                 repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(),
-                    indexShardSnapshotStatus, future);
+                    indexShardSnapshotStatus, true, future);
                 future.actionGet();
                 final PlainActionFuture<SnapshotInfo> finFuture = PlainActionFuture.newFuture();
-                repository.finalizeSnapshot(snapshotId, Collections.singletonList(indexId),
+                repository.finalizeSnapshot(snapshotId,
+                    ShardGenerations.builder().put(indexId, 0, indexShardSnapshotStatus.generation()).build(),
                     indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(),
                     repository.getRepositoryData().getGenId(), true,
                     MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(),
+                    true,
                     finFuture);
                 finFuture.actionGet();
             });

+ 2 - 1
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java

@@ -334,7 +334,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
             new SnapshotsInProgress.Entry(
                 snapshot, true, false, SnapshotsInProgress.State.INIT,
                 Collections.singletonList(new IndexId("name", "id")), 0, 0,
-                ImmutableOpenMap.<ShardId, SnapshotsInProgress.ShardSnapshotStatus>builder().build(), Collections.emptyMap()));
+                ImmutableOpenMap.<ShardId, SnapshotsInProgress.ShardSnapshotStatus>builder().build(), Collections.emptyMap(),
+                randomBoolean()));
         ClusterState state = ClusterState.builder(new ClusterName("cluster"))
             .putCustom(SnapshotsInProgress.TYPE, inProgress)
             .build();