ソースを参照

Make Snapshot Logic Write Metadata after Segments (#45689)

* Write metadata during snapshot finalization after segment files to prevent outdated metadata in case of dynamic mapping updates as explained in #41581
* Keep the old behavior of writing the metadata beforehand in the case of mixed version clusters for BwC reasons
   * Still overwrite the metadata in the end, so even a mixed version cluster is fixed by this change if a newer version master does the finalization
* Fixes #41581
Armin Braun 6 年 前
コミット
121576d567
17 ファイル変更266 行追加247 行削除
  1. 2 2
      server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
  2. 5 3
      server/src/main/java/org/elasticsearch/repositories/Repository.java
  3. 25 16
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
  4. 6 5
      server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java
  5. 9 4
      server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java
  6. 42 15
      server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
  7. 1 1
      server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
  8. 5 5
      server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java
  9. 29 144
      server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
  10. 81 0
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  11. 4 2
      server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java
  12. 4 3
      server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java
  13. 1 16
      server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
  14. 1 1
      test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
  15. 1 2
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
  16. 44 23
      x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java
  17. 6 5
      x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

+ 2 - 2
server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

@@ -81,9 +81,9 @@ public class FilterRepository implements Repository {
     @Override
     public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
                                          List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
-                                         Map<String, Object> userMetadata) {
+                                         MetaData metaData, Map<String, Object> userMetadata) {
         return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
-            includeGlobalState, userMetadata);
+            includeGlobalState, metaData, userMetadata);
     }
 
     @Override

+ 5 - 3
server/src/main/java/org/elasticsearch/repositories/Repository.java

@@ -48,8 +48,6 @@ import java.util.function.Function;
  * <p>
  * To perform a snapshot:
  * <ul>
- * <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
- * with list of indices that will be included into the snapshot</li>
  * <li>Data nodes call {@link Repository#snapshotShard}
  * for each shard</li>
  * <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
@@ -116,7 +114,11 @@ public interface Repository extends LifecycleComponent {
      * @param snapshotId snapshot id
      * @param indices    list of indices to be snapshotted
      * @param metaData   cluster metadata
+     *
+     * @deprecated this method is only used when taking snapshots in a mixed version cluster where a master node older than
+     *             {@link org.elasticsearch.snapshots.SnapshotsService#NO_REPO_INITIALIZE_VERSION} is present.
      */
+    @Deprecated
     void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData);
 
     /**
@@ -136,7 +138,7 @@ public interface Repository extends LifecycleComponent {
      */
     SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
                                   List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
-                                  Map<String, Object> userMetadata);
+                                  MetaData clusterMetaData, Map<String, Object> userMetadata);
 
     /**
      * Deletes snapshot

+ 25 - 16
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -87,7 +87,6 @@ import org.elasticsearch.repositories.RepositoryCleanupResult;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.RepositoryVerificationException;
-import org.elasticsearch.snapshots.InvalidSnapshotNameException;
 import org.elasticsearch.snapshots.SnapshotCreationException;
 import org.elasticsearch.snapshots.SnapshotException;
 import org.elasticsearch.snapshots.SnapshotId;
@@ -143,7 +142,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     private static final String TESTS_FILE = "tests-";
 
-    private static final String METADATA_PREFIX = "meta-";
+    public static final String METADATA_PREFIX = "meta-";
 
     public static final String METADATA_NAME_FORMAT = METADATA_PREFIX + "%s.dat";
 
@@ -358,23 +357,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     @Override
     public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetaData) {
-        if (isReadOnly()) {
-            throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository");
-        }
         try {
-            final String snapshotName = snapshotId.getName();
-            // check if the snapshot name already exists in the repository
-            final RepositoryData repositoryData = getRepositoryData();
-            if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
-                throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists");
-            }
-
             // Write Global MetaData
-            globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID());
+            globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), true);
 
             // write the index metadata for each index in the snapshot
             for (IndexId index : indices) {
-                indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID());
+                indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), true);
             }
         } catch (IOException ex) {
             throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
@@ -610,14 +599,34 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                                          final List<SnapshotShardFailure> shardFailures,
                                          final long repositoryStateId,
                                          final boolean includeGlobalState,
+                                         final MetaData clusterMetaData,
                                          final Map<String, Object> userMetadata) {
         SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
             indices.stream().map(IndexId::getName).collect(Collectors.toList()),
             startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
             includeGlobalState, userMetadata);
+
+        try {
+            // We ignore all FileAlreadyExistsException here since otherwise a master failover while in this method will
+            // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
+            // index or global metadata will be compatible with the segments written in this snapshot as well.
+            // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way that
+            // decrements the generation it points at
+
+            // Write Global MetaData
+            globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false);
+
+            // write the index metadata for each index in the snapshot
+            for (IndexId index : indices) {
+                indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
+            }
+        } catch (IOException ex) {
+            throw new SnapshotException(metadata.name(), snapshotId, "failed to write metadata for snapshot", ex);
+        }
+
         try {
             final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
-            snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID());
+            snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID(), false);
             writeIndexGen(updatedRepositoryData, repositoryStateId);
         } catch (FileAlreadyExistsException ex) {
             // if another master was elected and took over finalizing the snapshot, it is possible
@@ -995,7 +1004,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
                 logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
                 try {
-                    indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID());
+                    indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID(), false);
                 } catch (IOException e) {
                     throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
                 }

+ 6 - 5
server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java

@@ -175,15 +175,16 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
      * <p>
      * The blob will be compressed and checksum will be written if required.
      *
-     * @param obj           object to be serialized
-     * @param blobContainer blob container
-     * @param name          blob name
+     * @param obj                 object to be serialized
+     * @param blobContainer       blob container
+     * @param name                blob name
+     * @param failIfAlreadyExists Whether to fail if the blob already exists
      */
-    public void write(T obj, BlobContainer blobContainer, String name) throws IOException {
+    public void write(T obj, BlobContainer blobContainer, String name, boolean failIfAlreadyExists) throws IOException {
         final String blobName = blobName(name);
         writeTo(obj, blobName, bytesArray -> {
             try (InputStream stream = bytesArray.streamInput()) {
-                blobContainer.writeBlob(blobName, stream, bytesArray.length(), true);
+                blobContainer.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists);
             }
         });
     }

+ 9 - 4
server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java

@@ -118,17 +118,19 @@
  *
  * <p>Creating a snapshot in the repository happens in the three steps described in detail below.</p>
  *
- * <h3>Initializing a Snapshot in the Repository</h3>
+ * <h3>Initializing a Snapshot in the Repository (Mixed Version Clusters only)</h3>
  *
- * <p>Creating a snapshot in the repository starts with a call to {@link org.elasticsearch.repositories.Repository#initializeSnapshot} which
- * the blob store repository implements via the following actions:</p>
+ * <p>In mixed version clusters that contain a node older than
+ * {@link org.elasticsearch.snapshots.SnapshotsService#NO_REPO_INITIALIZE_VERSION}, creating a snapshot in the repository starts with a
+ * call to {@link org.elasticsearch.repositories.Repository#initializeSnapshot} which the blob store repository implements via the
+ * following actions:</p>
  * <ol>
  * <li>Verify that no snapshot by the requested name exists.</li>
  * <li>Write a blob containing the cluster metadata to the root of the blob store repository at {@code /meta-${snapshot-uuid}.dat}</li>
  * <li>Write the metadata for each index to a blob in that index's directory at
  * {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}</li>
  * </ol>
- * TODO: This behavior is problematic, adjust these docs once https://github.com/elastic/elasticsearch/issues/41581 is fixed
+ * TODO: Remove this section once BwC logic it references is removed
  *
  * <h3>Writing Shard Data (Segments)</h3>
  *
@@ -164,6 +166,9 @@
  * to finalizing the snapshot by invoking {@link org.elasticsearch.repositories.Repository#finalizeSnapshot}. This method executes the
  * following actions in order:</p>
  * <ol>
+ * <li>Write a blob containing the cluster metadata to the root of the blob store repository at {@code /meta-${snapshot-uuid}.dat}</li>
+ * <li>Write the metadata for each index to a blob in that index's directory at
+ * {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}</li>
  * <li>Write the {@link org.elasticsearch.snapshots.SnapshotInfo} blob for the given snapshot to the key {@code /snap-${snapshot-uuid}.dat}
  * directly under the repository root.</li>
  * <li>Write an updated {@code RepositoryData} blob to the key {@code /index-${N+1}} using the {@code N} determined when initializing the

+ 42 - 15
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.util.CollectionUtil;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
@@ -68,6 +69,7 @@ import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
+import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.RepositoryMissingException;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -108,12 +110,19 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
  * the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method</li>
  * <li>When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot
  * as completed</li>
- * <li>After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository,
+ * <li>After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry, MetaData)} finalizes snapshot in the repository,
  * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls
  * {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state</li>
  * </ul>
  */
 public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier {
+
+    /**
+     * Minimum node version which does not use {@link Repository#initializeSnapshot(SnapshotId, List, MetaData)} to write snapshot metadata
+     * when starting a snapshot.
+     */
+    public static final Version NO_REPO_INITIALIZE_VERSION = Version.V_8_0_0;
+
     private static final Logger logger = LogManager.getLogger(SnapshotsService.class);
 
     private final ClusterService clusterService;
@@ -398,24 +407,29 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                 assert initializingSnapshots.contains(snapshot.snapshot());
                 Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository());
 
-                MetaData metaData = clusterState.metaData();
-                if (!snapshot.includeGlobalState()) {
-                    // Remove global state from the cluster state
-                    MetaData.Builder builder = MetaData.builder();
-                    for (IndexId index : snapshot.indices()) {
-                        builder.put(metaData.index(index.getName()), false);
-                    }
-                    metaData = builder.build();
+                if (repository.isReadOnly()) {
+                    throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository");
+                }
+                final String snapshotName = snapshot.snapshot().getSnapshotId().getName();
+                // check if the snapshot name already exists in the repository
+                if (repository.getRepositoryData().getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
+                    throw new InvalidSnapshotNameException(
+                        repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
+                }
+                if (clusterState.nodes().getMinNodeVersion().onOrAfter(NO_REPO_INITIALIZE_VERSION) == false) {
+                    // In mixed version clusters we initialize the snapshot in the repository so that in case of a master failover to an
+                    // older version master node snapshot finalization (that assumes initializeSnapshot was called) produces a valid
+                    // snapshot.
+                    repository.initializeSnapshot(
+                        snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaDataForSnapshot(snapshot, clusterState.metaData()));
                 }
-
-                repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData);
                 snapshotCreated = true;
 
                 logger.info("snapshot [{}] started", snapshot.snapshot());
                 if (snapshot.indices().isEmpty()) {
                     // No indices in this snapshot - we are done
                     userCreateSnapshotListener.onResponse(snapshot.snapshot());
-                    endSnapshot(snapshot);
+                    endSnapshot(snapshot, clusterState.metaData());
                     return;
                 }
                 clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
@@ -498,7 +512,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                             assert snapshotsInProgress != null;
                             final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot());
                             assert entry != null;
-                            endSnapshot(entry);
+                            endSnapshot(entry, newState.metaData());
                         }
                     }
                 });
@@ -556,6 +570,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                                                          Collections.emptyList(),
                                                          snapshot.getRepositoryStateId(),
                                                          snapshot.includeGlobalState(),
+                                                         metaDataForSnapshot(snapshot, clusterService.state().metaData()),
                                                          snapshot.userMetadata());
                 } catch (Exception inner) {
                     inner.addSuppressed(exception);
@@ -565,7 +580,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             }
             userCreateSnapshotListener.onFailure(e);
         }
+    }
 
+    private static MetaData metaDataForSnapshot(SnapshotsInProgress.Entry snapshot, MetaData metaData) {
+        if (snapshot.includeGlobalState() == false) {
+            // Remove global state from the cluster state
+            MetaData.Builder builder = MetaData.builder();
+            for (IndexId index : snapshot.indices()) {
+                builder.put(metaData.index(index.getName()), false);
+            }
+            metaData = builder.build();
+        }
+        return metaData;
     }
 
     private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) {
@@ -713,7 +739,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                         entry -> entry.state().completed()
                             || initializingSnapshots.contains(entry.snapshot()) == false
                                && (entry.state() == State.INIT || completed(entry.shards().values()))
-                    ).forEach(this::endSnapshot);
+                    ).forEach(entry -> endSnapshot(entry, event.state().metaData()));
                 }
                 if (newMaster) {
                     finalizeSnapshotDeletionFromPreviousMaster(event);
@@ -960,7 +986,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      *
      * @param entry snapshot
      */
-    private void endSnapshot(final SnapshotsInProgress.Entry entry) {
+    private void endSnapshot(SnapshotsInProgress.Entry entry, MetaData metaData) {
         if (endingSnapshots.add(entry.snapshot()) == false) {
             return;
         }
@@ -988,6 +1014,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     unmodifiableList(shardFailures),
                     entry.getRepositoryStateId(),
                     entry.includeGlobalState(),
+                    metaDataForSnapshot(entry, metaData),
                     entry.userMetadata());
                 removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
                 logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());

+ 1 - 1
server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

@@ -161,7 +161,7 @@ public class RepositoriesServiceTests extends ESTestCase {
         @Override
         public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
                                              int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
-                                             boolean includeGlobalState, Map<String, Object> userMetadata) {
+                                             boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata) {
             return null;
         }
 

+ 5 - 5
server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java

@@ -116,8 +116,8 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
             xContentRegistry(), true);
 
         // Write blobs in different formats
-        checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile");
-        checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp");
+        checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", true);
+        checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", true);
 
         // Assert that all checksum blobs can be read by all formats
         assertEquals(checksumSMILE.read(blobContainer, "check-smile").getText(), "checksum smile");
@@ -136,8 +136,8 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
         ChecksumBlobStoreFormat<BlobObj> checksumFormatComp = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
             xContentRegistry(), true);
         BlobObj blobObj = new BlobObj(veryRedundantText.toString());
-        checksumFormatComp.write(blobObj, blobContainer, "blob-comp");
-        checksumFormat.write(blobObj, blobContainer, "blob-not-comp");
+        checksumFormatComp.write(blobObj, blobContainer, "blob-comp", true);
+        checksumFormat.write(blobObj, blobContainer, "blob-not-comp", true);
         Map<String, BlobMetaData> blobs = blobContainer.listBlobsByPrefix("blob-");
         assertEquals(blobs.size(), 2);
         assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length()));
@@ -150,7 +150,7 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
         BlobObj blobObj = new BlobObj(testString);
         ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
             xContentRegistry(), randomBoolean());
-        checksumFormat.write(blobObj, blobContainer, "test-path");
+        checksumFormat.write(blobObj, blobContainer, "test-path", true);
         assertEquals(checksumFormat.read(blobContainer, "test-path").getText(), testString);
         randomCorruption(blobContainer, "test-path");
         try {

+ 29 - 144
server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

@@ -47,7 +47,6 @@ import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.RestoreInProgress;
 import org.elasticsearch.cluster.SnapshotsInProgress;
@@ -110,7 +109,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -2538,28 +2536,15 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
 
         Client client = client();
 
-        boolean allowPartial = randomBoolean();
         logger.info("-->  creating repository");
 
-        // only block on repo init if we have partial snapshot or we run into deadlock when acquiring shard locks for index deletion/closing
-        boolean initBlocking = allowPartial || randomBoolean();
-        if (initBlocking) {
-            assertAcked(client.admin().cluster().preparePutRepository("test-repo")
-                .setType("mock").setSettings(Settings.builder()
-                    .put("location", randomRepoPath())
-                    .put("compress", randomBoolean())
-                    .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
-                    .put("block_on_init", true)
-                ));
-        } else {
-            assertAcked(client.admin().cluster().preparePutRepository("test-repo")
-                .setType("mock").setSettings(Settings.builder()
-                    .put("location", randomRepoPath())
-                    .put("compress", randomBoolean())
-                    .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
-                    .put("block_on_data", true)
-                ));
-        }
+        assertAcked(client.admin().cluster().preparePutRepository("test-repo")
+            .setType("mock").setSettings(Settings.builder()
+                .put("location", randomRepoPath())
+                .put("compress", randomBoolean())
+                .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
+                .put("block_on_data", true)));
+
 
         createIndex("test-idx-1", "test-idx-2", "test-idx-3");
         ensureGreen();
@@ -2575,70 +2560,40 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
         assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
 
-        logger.info("--> snapshot allow partial {}", allowPartial);
+        logger.info("--> snapshot");
         ActionFuture<CreateSnapshotResponse> future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
-            .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(allowPartial).execute();
+            .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(false).execute();
         logger.info("--> wait for block to kick in");
-        if (initBlocking) {
-            waitForBlock(internalCluster().getMasterName(), "test-repo", TimeValue.timeValueMinutes(1));
-        } else {
-            waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
-        }
-        boolean closedOnPartial = false;
+        waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
+
         try {
-            if (allowPartial) {
-                // partial snapshots allow close / delete operations
-                if (randomBoolean()) {
-                    logger.info("--> delete index while partial snapshot is running");
+            // non-partial snapshots do not allow close / delete operations on indices where snapshot has not been completed
+            if (randomBoolean()) {
+                try {
+                    logger.info("--> delete index while non-partial snapshot is running");
                     client.admin().indices().prepareDelete("test-idx-1").get();
-                } else {
-                    logger.info("--> close index while partial snapshot is running");
-                    closedOnPartial = true;
-                    client.admin().indices().prepareClose("test-idx-1").get();
+                    fail("Expected deleting index to fail during snapshot");
+                } catch (SnapshotInProgressException e) {
+                    assertThat(e.getMessage(), containsString("Cannot delete indices that are being snapshotted: [[test-idx-1/"));
                 }
             } else {
-                // non-partial snapshots do not allow close / delete operations on indices where snapshot has not been completed
-                if (randomBoolean()) {
-                    try {
-                        logger.info("--> delete index while non-partial snapshot is running");
-                        client.admin().indices().prepareDelete("test-idx-1").get();
-                        fail("Expected deleting index to fail during snapshot");
-                    } catch (SnapshotInProgressException e) {
-                        assertThat(e.getMessage(), containsString("Cannot delete indices that are being snapshotted: [[test-idx-1/"));
-                    }
-                } else {
-                    try {
-                        logger.info("--> close index while non-partial snapshot is running");
-                        client.admin().indices().prepareClose("test-idx-1").get();
-                        fail("Expected closing index to fail during snapshot");
-                    } catch (SnapshotInProgressException e) {
-                        assertThat(e.getMessage(), containsString("Cannot close indices that are being snapshotted: [[test-idx-1/"));
-                    }
+                try {
+                    logger.info("--> close index while non-partial snapshot is running");
+                    client.admin().indices().prepareClose("test-idx-1").get();
+                    fail("Expected closing index to fail during snapshot");
+                } catch (SnapshotInProgressException e) {
+                    assertThat(e.getMessage(), containsString("Cannot close indices that are being snapshotted: [[test-idx-1/"));
                 }
             }
         } finally {
-            if (initBlocking) {
-                logger.info("--> unblock running master node");
-                unblockNode("test-repo", internalCluster().getMasterName());
-            } else {
-                logger.info("--> unblock all data nodes");
-                unblockAllDataNodes("test-repo");
-            }
+            logger.info("--> unblock all data nodes");
+            unblockAllDataNodes("test-repo");
         }
         logger.info("--> waiting for snapshot to finish");
         CreateSnapshotResponse createSnapshotResponse = future.get();
 
-        if (allowPartial && closedOnPartial == false) {
-            logger.info("Deleted/Closed index during snapshot, but allow partial");
-            assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.PARTIAL)));
-            assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
-            assertThat(createSnapshotResponse.getSnapshotInfo().failedShards(), greaterThan(0));
-            assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
-                lessThan(createSnapshotResponse.getSnapshotInfo().totalShards()));
-        } else {
-            logger.info("Snapshot successfully completed");
-            assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS)));
-        }
+        logger.info("Snapshot successfully completed");
+        assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS)));
     }
 
     public void testCloseIndexDuringRestore() throws Exception {
@@ -3493,7 +3448,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
                     assertThat(shardFailure.reason(), containsString("Random IOException"));
                 }
             }
-        } catch (SnapshotCreationException | RepositoryException ex) {
+        } catch (SnapshotException | RepositoryException ex) {
             // sometimes, the snapshot will fail with a top level I/O exception
             assertThat(ExceptionsHelper.stackTrace(ex), containsString("Random IOException"));
         }
@@ -3856,76 +3811,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         assertThat(client.prepareGet(restoredIndexName2, typeName, sameSourceIndex ? docId : docId2).get().isExists(), equalTo(true));
     }
 
-    public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
-        final Client client = client();
-
-        // Blocks on initialization
-        assertAcked(client.admin().cluster().preparePutRepository("repository")
-            .setType("mock").setSettings(Settings.builder()
-                .put("location", randomRepoPath())
-                .put("block_on_init", true)
-            ));
-
-        createIndex("test-idx");
-        final int nbDocs = scaledRandomIntBetween(100, 500);
-        for (int i = 0; i < nbDocs; i++) {
-            index("test-idx", "_doc", Integer.toString(i), "foo", "bar" + i);
-        }
-        flushAndRefresh("test-idx");
-        assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo((long) nbDocs));
-
-        // Create a snapshot
-        client.admin().cluster().prepareCreateSnapshot("repository", "snap").execute();
-        waitForBlock(internalCluster().getMasterName(), "repository", TimeValue.timeValueMinutes(1));
-        boolean blocked = true;
-
-        // Snapshot is initializing (and is blocked at this stage)
-        SnapshotsStatusResponse snapshotsStatus = client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get();
-        assertThat(snapshotsStatus.getSnapshots().iterator().next().getState(), equalTo(State.INIT));
-
-        final List<State> states = new CopyOnWriteArrayList<>();
-        final ClusterStateListener listener = event -> {
-            SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
-            for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
-                if ("snap".equals(entry.snapshot().getSnapshotId().getName())) {
-                    states.add(entry.state());
-                }
-            }
-        };
-
-        try {
-            // Record the upcoming states of the snapshot on all nodes
-            internalCluster().getInstances(ClusterService.class).forEach(clusterService -> clusterService.addListener(listener));
-
-            // Delete the snapshot while it is being initialized
-            ActionFuture<AcknowledgedResponse> delete = client.admin().cluster().prepareDeleteSnapshot("repository", "snap").execute();
-
-            // The deletion must set the snapshot in the ABORTED state
-            assertBusy(() -> {
-                SnapshotsStatusResponse status =
-                    client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get();
-                assertThat(status.getSnapshots().iterator().next().getState(), equalTo(State.ABORTED));
-            });
-
-            // Now unblock the repository
-            unblockNode("repository", internalCluster().getMasterName());
-            blocked = false;
-
-            assertAcked(delete.get());
-            expectThrows(SnapshotMissingException.class, () ->
-                    client.admin().cluster().prepareGetSnapshots("repository").setSnapshots("snap").get()
-                            .getSnapshots("repository"));
-
-            assertFalse("Expecting snapshot state to be updated", states.isEmpty());
-            assertFalse("Expecting snapshot to be aborted and not started at all", states.contains(State.STARTED));
-        } finally {
-            internalCluster().getInstances(ClusterService.class).forEach(clusterService -> clusterService.removeListener(listener));
-            if (blocked) {
-                unblockNode("repository", internalCluster().getMasterName());
-            }
-        }
-    }
-
     public void testRestoreIncreasesPrimaryTerms() {
         final String indexName = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT);
         createIndex(indexName, Settings.builder()

+ 81 - 0
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -196,12 +196,14 @@ import java.util.stream.Stream;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
+import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener;
 import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
 import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.mockito.Mockito.mock;
 
 public class SnapshotResiliencyTests extends ESTestCase {
@@ -493,6 +495,85 @@ public class SnapshotResiliencyTests extends ESTestCase {
         assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0)));
     }
 
+    public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() {
+        setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
+
+        String repoName = "repo";
+        String snapshotName = "snapshot";
+        final String index = "test";
+
+        final int shards = randomIntBetween(1, 10);
+        final int documents = randomIntBetween(2, 100);
+        TestClusterNode masterNode =
+            testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
+
+        final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
+
+        continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> {
+            final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false);
+            for (int i = 0; i < documents; ++i) {
+                // Index a few documents with different field names so we trigger a dynamic mapping update for each of them
+                masterNode.client.bulk(
+                    new BulkRequest().add(new IndexRequest(index).source(Map.of("foo" + i, "bar")))
+                        .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
+                    assertNoFailureListener(
+                        bulkResponse -> {
+                            assertFalse("Failures in bulkresponse: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
+                            if (initiatedSnapshot.compareAndSet(false, true)) {
+                                masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
+                                    .setWaitForCompletion(true).execute(createSnapshotResponseStepListener);
+                            }
+                        }));
+            }
+        });
+
+        final String restoredIndex = "restored";
+
+        final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseStepListener = new StepListener<>();
+
+        continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().restoreSnapshot(
+            new RestoreSnapshotRequest(repoName, snapshotName)
+                .renamePattern(index).renameReplacement(restoredIndex).waitForCompletion(true), restoreSnapshotResponseStepListener));
+
+        final StepListener<SearchResponse> searchResponseStepListener = new StepListener<>();
+
+        continueOrDie(restoreSnapshotResponseStepListener, restoreSnapshotResponse -> {
+            assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
+            masterNode.client.search(
+                new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)),
+                searchResponseStepListener);
+        });
+
+        final AtomicBoolean documentCountVerified = new AtomicBoolean();
+
+        continueOrDie(searchResponseStepListener, r -> {
+            final long hitCount = r.getHits().getTotalHits().value;
+            assertThat(
+                "Documents were restored but the restored index mapping was older than some documents and misses some of their fields",
+                (int) hitCount,
+                lessThanOrEqualTo(((Map<?, ?>) masterNode.clusterService.state().metaData().index(restoredIndex).mapping()
+                    .sourceAsMap().get("properties")).size())
+            );
+            documentCountVerified.set(true);
+        });
+
+        runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L));
+
+        assertNotNull(createSnapshotResponseStepListener.result());
+        assertNotNull(restoreSnapshotResponseStepListener.result());
+        SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
+        assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
+        final Repository repository = masterNode.repositoriesService.repository(repoName);
+        Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
+        assertThat(snapshotIds, hasSize(1));
+
+        final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
+        assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
+        assertThat(snapshotInfo.indices(), containsInAnyOrder(index));
+        assertEquals(shards, snapshotInfo.successfulShards());
+        assertEquals(0, snapshotInfo.failedShards());
+    }
+
     private StepListener<CreateIndexResponse> createRepoAndIndex(TestClusterNode masterNode, String repoName, String index, int shards) {
         final AdminClient adminClient = masterNode.client.admin();
 

+ 4 - 2
server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java

@@ -287,9 +287,11 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
                     // We do some checks in case there is a consistent state for a blob to prevent turning it inconsistent.
                     final boolean hasConsistentContent =
                         relevantActions.size() == 1 && relevantActions.get(0).operation == Operation.PUT;
-                    if (BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) {
+                    if (BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)
+                        || blobName.startsWith(BlobStoreRepository.METADATA_PREFIX)) {
                         // TODO: Ensure that it is impossible to ever decrement the generation id stored in index.latest then assert that
-                        //       it never decrements here
+                        //       it never decrements here. Same goes for the metadata, ensure that we never overwrite newer with older
+                        //       metadata.
                     } else if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)) {
                         if (hasConsistentContent) {
                                 if (basePath().buildAsString().equals(path().buildAsString())) {

+ 4 - 3
server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.snapshots.mockstore;
 
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.blobstore.BlobContainer;
@@ -158,19 +159,19 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
             // We create a snap- blob for snapshot "foo" in the first generation
             final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID());
             repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(),
-                -1L, false, Collections.emptyMap());
+                -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap());
 
             // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
             final AssertionError assertionError = expectThrows(AssertionError.class,
                 () -> repository.finalizeSnapshot(
                     snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(),
-                 0, false, Collections.emptyMap()));
+                 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap()));
             assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n     but: was <5>"));
 
             // We try to write yet another snap- blob for "foo" in the next generation.
             // It passes cleanly because the content of the blob except for the timestamps.
             repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(),
-                0, false, Collections.emptyMap());
+                0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap());
         }
     }
 

+ 1 - 16
server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

@@ -24,7 +24,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.index.CorruptIndexException;
 import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetaData;
@@ -39,10 +38,8 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.plugins.RepositoryPlugin;
-import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.fs.FsRepository;
-import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
@@ -102,8 +99,6 @@ public class MockRepository extends FsRepository {
 
     private final String randomPrefix;
 
-    private volatile boolean blockOnInitialization;
-
     private volatile boolean blockOnControlFiles;
 
     private volatile boolean blockOnDataFiles;
@@ -126,21 +121,12 @@ public class MockRepository extends FsRepository {
         maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L);
         blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false);
         blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false);
-        blockOnInitialization = metadata.settings().getAsBoolean("block_on_init", false);
         blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false);
         randomPrefix = metadata.settings().get("random", "default");
         waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
         logger.info("starting mock repository with random prefix {}", randomPrefix);
     }
 
-    @Override
-    public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetadata) {
-        if (blockOnInitialization) {
-            blockExecution();
-        }
-        super.initializeSnapshot(snapshotId, indices, clusterMetadata);
-    }
-
     private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) {
         // TODO: use another method of testing not being able to read the test file written by the master...
         // this is super duper hacky
@@ -174,7 +160,6 @@ public class MockRepository extends FsRepository {
         // Clean blocking flags, so we wouldn't try to block again
         blockOnDataFiles = false;
         blockOnControlFiles = false;
-        blockOnInitialization = false;
         blockOnWriteIndexFile = false;
         blockAndFailOnWriteSnapFile = false;
         this.notifyAll();
@@ -200,7 +185,7 @@ public class MockRepository extends FsRepository {
         logger.debug("[{}] Blocking execution", metadata.name());
         boolean wasBlocked = false;
         try {
-            while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile ||
+            while (blockOnDataFiles || blockOnControlFiles || blockOnWriteIndexFile ||
                 blockAndFailOnWriteSnapFile) {
                 blocked = true;
                 this.wait();

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

@@ -100,7 +100,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
     @Override
     public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
                                          int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
-                                         boolean includeGlobalState, Map<String, Object> userMetadata) {
+                                         boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata) {
         return null;
     }
 

+ 1 - 2
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -252,11 +252,10 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
     public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
         throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
     }
-
     @Override
     public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
                                          List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
-                                         Map<String, Object> userMetadata) {
+                                         MetaData metaData, Map<String, Object> userMetadata) {
         throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
     }
 

+ 44 - 23
x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java

@@ -45,6 +45,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -82,34 +83,54 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
         // a _source only snapshot with a plain repository it will be just fine since we already set the
         // required engine, that the index is read-only and the mapping to a default mapping
         try {
-            MetaData.Builder builder = MetaData.builder(metaData);
-            for (IndexId indexId : indices) {
-                IndexMetaData index = metaData.index(indexId.getName());
-                IndexMetaData.Builder indexMetadataBuilder = IndexMetaData.builder(index);
-                // for a minimal restore we basically disable indexing on all fields and only create an index
-                // that is valid from an operational perspective. ie. it will have all metadata fields like version/
-                // seqID etc. and an indexed ID field such that we can potentially perform updates on them or delete documents.
-                ImmutableOpenMap<String, MappingMetaData> mappings = index.getMappings();
-                Iterator<ObjectObjectCursor<String, MappingMetaData>> iterator = mappings.iterator();
-                while (iterator.hasNext()) {
-                    ObjectObjectCursor<String, MappingMetaData> next = iterator.next();
-                    // we don't need to obey any routing here stuff is read-only anyway and get is disabled
-                    final String mapping = "{ \"" + next.key + "\": { \"enabled\": false, \"_meta\": " + next.value.source().string()
-                        + " } }";
-                    indexMetadataBuilder.putMapping(next.key, mapping);
-                }
-                indexMetadataBuilder.settings(Settings.builder().put(index.getSettings())
-                    .put(SOURCE_ONLY.getKey(), true)
-                    .put("index.blocks.write", true)); // read-only!
-                indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion());
-                builder.put(indexMetadataBuilder);
-            }
-            super.initializeSnapshot(snapshotId, indices, builder.build());
+            super.initializeSnapshot(snapshotId, indices, metadataToSnapshot(indices, metaData));
         } catch (IOException ex) {
             throw new UncheckedIOException(ex);
         }
     }
 
+    @Override
+    public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
+        List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData metaData,
+        Map<String, Object> userMetadata) {
+        // we process the index metadata at snapshot time. This means if somebody tries to restore
+        // a _source only snapshot with a plain repository it will be just fine since we already set the
+        // required engine, that the index is read-only and the mapping to a default mapping
+        try {
+            return super.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
+                includeGlobalState, metadataToSnapshot(indices, metaData), userMetadata);
+        } catch (IOException ex) {
+            throw new UncheckedIOException(ex);
+        }
+    }
+
+    private static MetaData metadataToSnapshot(List<IndexId> indices, MetaData metaData) throws IOException {
+        MetaData.Builder builder = MetaData.builder(metaData);
+        for (IndexId indexId : indices) {
+            IndexMetaData index = metaData.index(indexId.getName());
+            IndexMetaData.Builder indexMetadataBuilder = IndexMetaData.builder(index);
+            // for a minimal restore we basically disable indexing on all fields and only create an index
+            // that is valid from an operational perspective. ie. it will have all metadata fields like version/
+            // seqID etc. and an indexed ID field such that we can potentially perform updates on them or delete documents.
+            ImmutableOpenMap<String, MappingMetaData> mappings = index.getMappings();
+            Iterator<ObjectObjectCursor<String, MappingMetaData>> iterator = mappings.iterator();
+            while (iterator.hasNext()) {
+                ObjectObjectCursor<String, MappingMetaData> next = iterator.next();
+                // we don't need to obey any routing here stuff is read-only anyway and get is disabled
+                final String mapping = "{ \"" + next.key + "\": { \"enabled\": false, \"_meta\": " + next.value.source().string()
+                    + " } }";
+                indexMetadataBuilder.putMapping(next.key, mapping);
+            }
+            indexMetadataBuilder.settings(Settings.builder().put(index.getSettings())
+                .put(SOURCE_ONLY.getKey(), true)
+                .put("index.blocks.write", true)); // read-only!
+            indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion());
+            builder.put(indexMetadataBuilder);
+        }
+        return builder.build();
+    }
+
+
     @Override
     public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
                               IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {

+ 6 - 5
x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

@@ -65,7 +65,7 @@ import org.hamcrest.Matchers;
 
 import java.io.IOException;
 import java.nio.file.Path;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
@@ -200,14 +200,15 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
         repository.start();
         try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
+            final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
             runAsSnapshot(shard.getThreadPool(), () -> {
-                repository.initializeSnapshot(snapshotId, Arrays.asList(indexId),
-                    MetaData.builder().put(shard.indexSettings()
-                    .getIndexMetaData(), false).build());
-                final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
                 repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(),
                     indexShardSnapshotStatus, future);
                 future.actionGet();
+                repository.finalizeSnapshot(snapshotId, Collections.singletonList(indexId),
+                    indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(),
+                    repository.getRepositoryData().getGenId(), true,
+                    MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap());
             });
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());