Browse Source

Track Shard Snapshot Generation in CS (#46864)

* Track Shard Snapshot Generation in CS

Adds communication of new shard generations from datanodes to master
and tracking of those generations in the CS.
This is a preliminary to #46250
Armin Braun 6 years ago
parent
commit
f11a3c2229
18 changed files with 136 additions and 87 deletions
  1. 29 8
      server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
  2. 16 7
      server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java
  3. 1 1
      server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
  4. 1 1
      server/src/main/java/org/elasticsearch/repositories/Repository.java
  5. 6 6
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
  6. 13 9
      server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
  7. 22 15
      server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
  8. 6 6
      server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java
  9. 1 1
      server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java
  10. 1 1
      server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
  11. 2 1
      server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
  12. 5 4
      server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java
  13. 1 1
      server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java
  14. 15 10
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  15. 1 1
      test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
  16. 1 1
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
  17. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java
  18. 14 13
      x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

+ 29 - 8
server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

@@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.snapshots.Snapshot;
+import org.elasticsearch.snapshots.SnapshotsService;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -249,20 +250,26 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
     public static class ShardSnapshotStatus {
         private final ShardState state;
         private final String nodeId;
+
+        @Nullable
+        private final String generation;
+
+        @Nullable
         private final String reason;
 
-        public ShardSnapshotStatus(String nodeId) {
-            this(nodeId, ShardState.INIT);
+        public ShardSnapshotStatus(String nodeId, String generation) {
+            this(nodeId, ShardState.INIT, generation);
         }
 
-        public ShardSnapshotStatus(String nodeId, ShardState state) {
-            this(nodeId, state, null);
+        public ShardSnapshotStatus(String nodeId, ShardState state, String generation) {
+            this(nodeId, state, null, generation);
         }
 
-        public ShardSnapshotStatus(String nodeId, ShardState state, String reason) {
+        public ShardSnapshotStatus(String nodeId, ShardState state, String reason, String generation) {
             this.nodeId = nodeId;
             this.state = state;
             this.reason = reason;
+            this.generation = generation;
             // If the state is failed we have to have a reason for this failure
             assert state.failed() == false || reason != null;
         }
@@ -270,6 +277,12 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
         public ShardSnapshotStatus(StreamInput in) throws IOException {
             nodeId = in.readOptionalString();
             state = ShardState.fromValue(in.readByte());
+            if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
+                generation = in.readOptionalString();
+                assert generation != null || state != ShardState.SUCCESS : "Received null generation for shard state [" + state + "]";
+            } else {
+                generation = null;
+            }
             reason = in.readOptionalString();
         }
 
@@ -281,6 +294,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             return nodeId;
         }
 
+        public String generation() {
+            return this.generation;
+        }
+
         public String reason() {
             return reason;
         }
@@ -288,6 +305,9 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
         public void writeTo(StreamOutput out) throws IOException {
             out.writeOptionalString(nodeId);
             out.writeByte(state.value);
+            if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
+                out.writeOptionalString(generation);
+            }
             out.writeOptionalString(reason);
         }
 
@@ -296,8 +316,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             ShardSnapshotStatus status = (ShardSnapshotStatus) o;
-            return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state;
-
+            return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason)
+                && Objects.equals(generation, status.generation) && state == status.state;
         }
 
         @Override
@@ -305,12 +325,13 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             int result = state != null ? state.hashCode() : 0;
             result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
             result = 31 * result + (reason != null ? reason.hashCode() : 0);
+            result = 31 * result + (generation != null ? generation.hashCode() : 0);
             return result;
         }
 
         @Override
         public String toString() {
-            return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]";
+            return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + ", generation=" + generation + "]";
         }
     }
 

+ 16 - 7
server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java

@@ -58,6 +58,7 @@ public class IndexShardSnapshotStatus {
     }
 
     private final AtomicReference<Stage> stage;
+    private final AtomicReference<String> generation;
     private long startTime;
     private long totalTime;
     private int incrementalFileCount;
@@ -71,8 +72,10 @@ public class IndexShardSnapshotStatus {
 
     private IndexShardSnapshotStatus(final Stage stage, final long startTime, final long totalTime,
                                      final int incrementalFileCount, final int totalFileCount, final int processedFileCount,
-                                     final long incrementalSize, final long totalSize, final long processedSize, final String failure) {
+                                     final long incrementalSize, final long totalSize, final long processedSize, final String failure,
+                                     final String generation) {
         this.stage = new AtomicReference<>(Objects.requireNonNull(stage));
+        this.generation = new AtomicReference<>(generation);
         this.startTime = startTime;
         this.totalTime = totalTime;
         this.incrementalFileCount = incrementalFileCount;
@@ -109,9 +112,11 @@ public class IndexShardSnapshotStatus {
         return asCopy();
     }
 
-    public synchronized void moveToDone(final long endTime) {
+    public synchronized void moveToDone(final long endTime, final String newGeneration) {
+        assert newGeneration != null;
         if (stage.compareAndSet(Stage.FINALIZE, Stage.DONE)) {
             this.totalTime = Math.max(0L, endTime - startTime);
+            this.generation.set(newGeneration);
         } else {
             throw new IllegalStateException("Unable to move the shard snapshot status to [DONE]: " +
                 "expecting [FINALIZE] but got [" + stage.get() + "]");
@@ -131,6 +136,10 @@ public class IndexShardSnapshotStatus {
         }
     }
 
+    public String generation() {
+        return generation.get();
+    }
+
     public boolean isAborted() {
         return stage.get() == Stage.ABORTED;
     }
@@ -156,8 +165,8 @@ public class IndexShardSnapshotStatus {
             indexVersion, failure);
     }
 
-    public static IndexShardSnapshotStatus newInitializing() {
-        return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null);
+    public static IndexShardSnapshotStatus newInitializing(String generation) {
+        return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation);
     }
 
     public static IndexShardSnapshotStatus newFailed(final String failure) {
@@ -165,15 +174,15 @@ public class IndexShardSnapshotStatus {
         if (failure == null) {
             throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus");
         }
-        return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure);
+        return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null);
     }
 
     public static IndexShardSnapshotStatus newDone(final long startTime, final long totalTime,
                                                    final int incrementalFileCount, final int fileCount,
-                                                   final long incrementalSize, final long size) {
+                                                   final long incrementalSize, final long size, String generation) {
         // The snapshot is done which means the number of processed files is the same as total
         return new IndexShardSnapshotStatus(Stage.DONE, startTime, totalTime, incrementalFileCount, fileCount, incrementalFileCount,
-            incrementalSize, size, incrementalSize, null);
+            incrementalSize, size, incrementalSize, null, generation);
     }
 
     /**

+ 1 - 1
server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

@@ -123,7 +123,7 @@ public class FilterRepository implements Repository {
 
     @Override
     public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
         in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
     }
     @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/repositories/Repository.java

@@ -209,7 +209,7 @@ public interface Repository extends LifecycleComponent {
      * @param listener            listener invoked on completion
      */
     void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
-                       IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener);
+                       IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener);
 
     /**
      * Restores snapshot of the shard.

+ 6 - 6
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -477,7 +477,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
      *     <li>Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}</li>
      * </ul>
      * @param repositoryStateId Current repository state id
-     * @param listener Lister to complete when done
+     * @param listener          Listener to complete when done
      */
     public void cleanup(long repositoryStateId, ActionListener<RepositoryCleanupResult> listener) {
         try {
@@ -942,10 +942,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     @Override
     public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
         final ShardId shardId = store.shardId();
         final long startTime = threadPool.absoluteTimeInMillis();
-        final StepListener<Void> snapshotDoneListener = new StepListener<>();
+        final StepListener<String> snapshotDoneListener = new StepListener<>();
         snapshotDoneListener.whenComplete(listener::onResponse, e -> {
             snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.stackTrace(e));
             listener.onFailure(e instanceof IndexShardSnapshotFailedException ? (IndexShardSnapshotFailedException) e
@@ -1084,8 +1084,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization",
                         snapshotId, shardId), e);
                 }
-                snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis());
-                snapshotDoneListener.onResponse(null);
+                snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), indexGeneration);
+                snapshotDoneListener.onResponse(indexGeneration);
             }, snapshotDoneListener::onFailure);
             if (indexIncrementalFileCount == 0) {
                 allFilesUploadedListener.onResponse(Collections.emptyList());
@@ -1153,7 +1153,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId);
         return IndexShardSnapshotStatus.newDone(snapshot.startTime(), snapshot.time(),
             snapshot.incrementalFileCount(), snapshot.totalFileCount(),
-            snapshot.incrementalSize(), snapshot.totalSize());
+            snapshot.incrementalSize(), snapshot.totalSize(), null); // Not adding a real generation here as it doesn't matter to callers
     }
 
     @Override

+ 13 - 9
server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

@@ -244,7 +244,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
                         if (startedShards == null) {
                              startedShards = new HashMap<>();
                         }
-                        startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing());
+                        startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing(shardSnapshotStatus.generation()));
                     }
                 }
                 if (startedShards != null && startedShards.isEmpty() == false) {
@@ -283,12 +283,15 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
                 assert indexId != null;
                 snapshot(shardId, snapshot, indexId, snapshotStatus, new ActionListener<>() {
                     @Override
-                    public void onResponse(final Void aVoid) {
+                    public void onResponse(String newGeneration) {
+                        assert newGeneration != null;
+                        assert newGeneration.equals(snapshotStatus.generation());
                         if (logger.isDebugEnabled()) {
                             final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
-                            logger.debug("snapshot ({}) completed to {} with {}", snapshot, snapshot.getRepository(), lastSnapshotStatus);
+                            logger.debug("snapshot [{}] completed to [{}] with [{}] at generation [{}]",
+                                snapshot, snapshot.getRepository(), lastSnapshotStatus, snapshotStatus.generation());
                         }
-                        notifySuccessfulSnapshotShard(snapshot, shardId);
+                        notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
                     }
 
                     @Override
@@ -308,7 +311,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
      * @param snapshotStatus snapshot status
      */
     private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId,
-                          final IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
+                          final IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
         try {
             final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
             if (indexShard.routingEntry().primary() == false) {
@@ -366,7 +369,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
                                 // but we think the shard is done - we need to make new master know that the shard is done
                                 logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " +
                                              "updating status on the master", snapshot.snapshot(), shardId);
-                                notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId);
+                                notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localShard.getValue().generation());
 
                             } else if (stage == Stage.FAILURE) {
                                 // but we think the shard failed - we need to make new master know that the shard failed
@@ -436,15 +439,16 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
     }
 
     /** Notify the master node that the given shard has been successfully snapshotted **/
-    private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) {
+    private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, String generation) {
+        assert generation != null;
         sendSnapshotShardUpdate(snapshot, shardId,
-            new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS));
+            new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS, generation));
     }
 
     /** Notify the master node that the given shard failed to be snapshotted **/
     private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) {
         sendSnapshotShardUpdate(snapshot, shardId,
-            new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure));
+            new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure, null));
     }
 
     /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */

+ 22 - 15
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -123,6 +123,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      */
     public static final Version NO_REPO_INITIALIZE_VERSION = Version.V_7_5_0;
 
+    public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_8_0_0;
+
     private static final Logger logger = LogManager.getLogger(SnapshotsService.class);
 
     private final ClusterService clusterService;
@@ -803,7 +805,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                                     logger.warn("failing snapshot of shard [{}] on closed node [{}]",
                                         shardEntry.key, shardStatus.nodeId());
                                     shards.put(shardEntry.key,
-                                        new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown"));
+                                        new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown",
+                                            shardStatus.generation()));
                                 }
                             }
                         }
@@ -908,7 +911,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                             // Shard that we were waiting for has started on a node, let's process it
                             snapshotChanged = true;
                             logger.trace("starting shard that we were waiting for [{}] on node [{}]", shardId, shardStatus.nodeId());
-                            shards.put(shardId, new ShardSnapshotStatus(shardRouting.primaryShard().currentNodeId()));
+                            shards.put(shardId,
+                                new ShardSnapshotStatus(shardRouting.primaryShard().currentNodeId(), shardStatus.generation()));
                             continue;
                         } else if (shardRouting.primaryShard().initializing() || shardRouting.primaryShard().relocating()) {
                             // Shard that we were waiting for hasn't started yet or still relocating - will continue to wait
@@ -920,7 +924,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 // Shard that we were waiting for went into unassigned state or disappeared - giving up
                 snapshotChanged = true;
                 logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardId, shardStatus.nodeId());
-                shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned"));
+                shards.put(shardId, new ShardSnapshotStatus(
+                    shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned", shardStatus.generation()));
             } else {
                 shards.put(shardId, shardStatus);
             }
@@ -1224,7 +1229,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                         for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
                             ShardSnapshotStatus status = shardEntry.value;
                             if (status.state().completed() == false) {
-                                status = new ShardSnapshotStatus(status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion");
+                                status = new ShardSnapshotStatus(
+                                    status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion", status.generation());
                             }
                             shardsBuilder.put(shardEntry.key, status);
                         }
@@ -1410,8 +1416,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             IndexMetaData indexMetaData = metaData.index(indexName);
             if (indexMetaData == null) {
                 // The index was deleted before we managed to start the snapshot - mark it as missing.
-                builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0),
-                    new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "missing index"));
+                builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), missingStatus(null, "missing index"));
             } else {
                 IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName);
                 for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
@@ -1419,20 +1424,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     if (indexRoutingTable != null) {
                         ShardRouting primary = indexRoutingTable.shard(i).primaryShard();
                         if (primary == null || !primary.assignedToNode()) {
-                            builder.put(shardId,
-                                new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "primary shard is not allocated"));
+                            builder.put(shardId, missingStatus(null, "primary shard is not allocated"));
                         } else if (primary.relocating() || primary.initializing()) {
-                            builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING));
+                            builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(
+                                primary.currentNodeId(), ShardState.WAITING, null));
                         } else if (!primary.started()) {
-                            builder.put(shardId,
-                                new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
-                                    "primary shard hasn't been started yet"));
+                            builder.put(shardId, missingStatus(primary.currentNodeId(), "primary shard hasn't been started yet"));
                         } else {
-                            builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId()));
+                            builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(
+                                primary.currentNodeId(), null));
                         }
                     } else {
-                        builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING,
-                            "missing routing table"));
+                        builder.put(shardId, missingStatus(null, "missing routing table"));
                     }
                 }
             }
@@ -1441,6 +1444,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         return builder.build();
     }
 
+    private static ShardSnapshotStatus missingStatus(@Nullable String nodeId, String reason) {
+        return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, ShardState.MISSING, reason, null);
+    }
+
     /**
      * Returns the indices that are currently being snapshotted (with partial == false) and that are contained in the indices-to-check set.
      */

+ 6 - 6
server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java

@@ -57,14 +57,14 @@ public class SnapshotsInProgressTests extends ESTestCase {
         ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
 
         // test more than one waiting shard in an index
-        shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
-        shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
-        shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
+        shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING, "1"));
+        shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING, "1"));
+        shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1"));
         // test exactly one waiting shard in an index
-        shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
-        shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
+        shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING, "1"));
+        shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1"));
         // test no waiting shards in an index
-        shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
+        shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1"));
         Entry entry = new Entry(snapshot, randomBoolean(), randomBoolean(), State.INIT,
                                 indices, System.currentTimeMillis(), randomLong(), shards.build(), SnapshotInfoTests.randomUserMetadata());
 

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java

@@ -465,7 +465,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
 
         final ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
         for (ShardRouting shardRouting : newState.routingTable().index(index).randomAllActiveShardsIt()) {
-            shardsBuilder.put(shardRouting.shardId(), new SnapshotsInProgress.ShardSnapshotStatus(shardRouting.currentNodeId()));
+            shardsBuilder.put(shardRouting.shardId(), new SnapshotsInProgress.ShardSnapshotStatus(shardRouting.currentNodeId(), "1"));
         }
 
         final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)));

+ 1 - 1
server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

@@ -202,7 +202,7 @@ public class RepositoriesServiceTests extends ESTestCase {
 
         @Override
         public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit
-            snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
+            snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
 
         }
 

+ 2 - 1
server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

@@ -159,7 +159,8 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
             // snapshot the shard
             final Repository repository = createRepository();
             final Snapshot snapshot = new Snapshot(repository.getMetadata().name(), new SnapshotId(randomAlphaOfLength(10), "_uuid"));
-            snapshotShard(shard, snapshot, repository);
+            final String shardGen = snapshotShard(shard, snapshot, repository);
+            assertNotNull(shardGen);
             final Snapshot snapshotWithSameName = new Snapshot(repository.getMetadata().name(), new SnapshotId(
                 snapshot.getSnapshotId().getName(), "_uuid2"));
             IndexShardSnapshotFailedException isfe = expectThrows(IndexShardSnapshotFailedException.class,

+ 5 - 4
server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java

@@ -100,15 +100,16 @@ public class FsRepositoryTests extends ESTestCase {
             IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID());
 
             IndexCommit indexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory());
-            final PlainActionFuture<Void> future1 = PlainActionFuture.newFuture();
+            final PlainActionFuture<String> future1 = PlainActionFuture.newFuture();
             runGeneric(threadPool, () -> {
-                IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
+                IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
                 repository.snapshotShard(store, null, snapshotId, indexId, indexCommit,
                     snapshotStatus, future1);
                 future1.actionGet();
                 IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
                 assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
             });
+            final String shardGeneration = future1.actionGet();
             Lucene.cleanLuceneIndex(directory);
             expectThrows(org.apache.lucene.index.IndexNotFoundException.class, () -> Lucene.readSegmentInfos(directory));
             DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
@@ -127,9 +128,9 @@ public class FsRepositoryTests extends ESTestCase {
             SnapshotId incSnapshotId = new SnapshotId("test1", "test1");
             IndexCommit incIndexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory());
             Collection<String> commitFileNames = incIndexCommit.getFileNames();
-            final PlainActionFuture<Void> future2 = PlainActionFuture.newFuture();
+            final PlainActionFuture<String> future2 = PlainActionFuture.newFuture();
             runGeneric(threadPool, () -> {
-                IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
+                IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
                 repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, future2);
                 future2.actionGet();
                 IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();

+ 1 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java

@@ -68,7 +68,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
             String nodeId = randomAlphaOfLength(10);
             ShardState shardState = randomFrom(ShardState.values());
             builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState,
-                shardState.failed() ? randomAlphaOfLength(10) : null));
+                shardState.failed() ? randomAlphaOfLength(10) : null, "1"));
         }
         ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = builder.build();
         return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards,

+ 15 - 10
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -827,25 +827,30 @@ public abstract class IndexShardTestCase extends ESTestCase {
             shard.recoveryState());
     }
 
-    /** Snapshot a shard using a given repository **/
-    protected void snapshotShard(final IndexShard shard,
-                                 final Snapshot snapshot,
-                                 final Repository repository) throws IOException {
-        final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
-        final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
+    /**
+     * Snapshot a shard using a given repository.
+     *
+     * @return new shard generation
+     */
+    protected String snapshotShard(final IndexShard shard,
+                                   final Snapshot snapshot,
+                                   final Repository repository) throws IOException {
+        final Index index = shard.shardId().getIndex();
+        final IndexId indexId = new IndexId(index.getName(), index.getUUID());
+        final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
+        final PlainActionFuture<String> future = PlainActionFuture.newFuture();
+        final String shardGen;
         try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
-            Index index = shard.shardId().getIndex();
-            IndexId indexId = new IndexId(index.getName(), index.getUUID());
-
             repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId,
                 indexCommitRef.getIndexCommit(), snapshotStatus, future);
-            future.actionGet();
+            shardGen = future.actionGet();
         }
 
         final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
         assertEquals(IndexShardSnapshotStatus.Stage.DONE, lastSnapshotStatus.getStage());
         assertEquals(shard.snapshotStoreMetadata().size(), lastSnapshotStatus.getTotalFileCount());
         assertNull(lastSnapshotStatus.getFailure());
+        return shardGen;
     }
 
     /**

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

@@ -135,7 +135,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
 
     @Override
     public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
     }
 
     @Override

+ 1 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -296,7 +296,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
 
     @Override
     public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
         throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
     }
 

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java

@@ -133,7 +133,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
 
     @Override
     public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
         if (mapperService.documentMapper() != null // if there is no mapping this is null
             && mapperService.documentMapper().sourceMapper().isComplete() == false) {
             listener.onFailure(

+ 14 - 13
x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

@@ -95,8 +95,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
         SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
         repository.start();
         try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
-            IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
-            final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
+            IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1");
+            final PlainActionFuture<String> future = PlainActionFuture.newFuture();
             runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
                 snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future));
             IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet);
@@ -117,14 +117,15 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
         IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
         SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
         repository.start();
-        int totalFileCount = -1;
+        int totalFileCount;
+        String shardGeneration;
         try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
-            IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
+            IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
             SnapshotId snapshotId = new SnapshotId("test", "test");
-            final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
+            final PlainActionFuture<String> future = PlainActionFuture.newFuture();
             runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
                 snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future));
-            future.actionGet();
+            shardGeneration = future.actionGet();
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
             totalFileCount = copy.getTotalFileCount();
@@ -136,11 +137,11 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
         try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
             SnapshotId snapshotId = new SnapshotId("test_1", "test_1");
 
-            IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
-            final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
+            IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
+            final PlainActionFuture<String> future = PlainActionFuture.newFuture();
             runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
                 snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future));
-            future.actionGet();
+            shardGeneration = future.actionGet();
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             // we processed the segments_N file plus _1.si, _1.fdx, _1.fnm, _1.fdt
             assertEquals(5, copy.getIncrementalFileCount());
@@ -152,8 +153,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
         try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
             SnapshotId snapshotId = new SnapshotId("test_2", "test_2");
 
-            IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
-            final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
+            IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
+            final PlainActionFuture<String> future = PlainActionFuture.newFuture();
             runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
                 snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future));
             future.actionGet();
@@ -199,8 +200,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
         SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
         repository.start();
         try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
-            IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
-            final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
+            IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
+            final PlainActionFuture<String> future = PlainActionFuture.newFuture();
             runAsSnapshot(shard.getThreadPool(), () -> {
                 repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(),
                     indexShardSnapshotStatus, future);