Browse Source

Fix Snapshot Repository Corruption in Downgrade Scenarios (#50692)

This PR introduces test infrastructure for downgrading a cluster while interacting with a given repository.
It fixes the fact that repository metadata in the new format could be written while there's still older snapshots in the repository that require the old-format metadata to be restorable.
Armin Braun 5 years ago
parent
commit
ee6fbcc4a7

+ 122 - 0
qa/repository-multi-version/build.gradle

@@ -0,0 +1,122 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.elasticsearch.gradle.Version
+import org.elasticsearch.gradle.info.BuildParams
+import org.elasticsearch.gradle.testclusters.RestTestRunnerTask
+
+apply plugin: 'elasticsearch.testclusters'
+apply plugin: 'elasticsearch.standalone-test'
+
+tasks.register("bwcTest") {
+  description = 'Runs backwards compatibility tests.'
+  group = 'verification'
+}
+
+dependencies {
+  testCompile project(':client:rest-high-level')
+}
+
+for (Version bwcVersion : bwcVersions.indexCompatible) {
+  String baseName = "v${bwcVersion}"
+  String oldClusterName = "${baseName}-old"
+  String newClusterName = "${baseName}-new"
+
+  def clusterSettings = { v ->
+    return {
+      version = v
+      numberOfNodes = 2
+      setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
+      javaHome = BuildParams.runtimeJavaHome
+    }
+  }
+
+  testClusters {
+    "${oldClusterName}" clusterSettings(bwcVersion.toString())
+    "${newClusterName}" clusterSettings(project.version)
+  }
+
+  tasks.register("${baseName}#Step1OldClusterTest", RestTestRunnerTask) {
+    useCluster testClusters."${oldClusterName}"
+    mustRunAfter(precommit)
+    doFirst {
+      project.delete("${buildDir}/cluster/shared/repo/${baseName}")
+    }
+    systemProperty 'tests.rest.suite', 'step1'
+  }
+
+  tasks.register("${baseName}#Step2NewClusterTest", RestTestRunnerTask) {
+    useCluster testClusters."${newClusterName}"
+    dependsOn "${baseName}#Step1OldClusterTest"
+    systemProperty 'tests.rest.suite', 'step2'
+  }
+
+  tasks.register("${baseName}#Step3OldClusterTest", RestTestRunnerTask) {
+    useCluster testClusters."${oldClusterName}"
+    dependsOn "${baseName}#Step2NewClusterTest"
+    systemProperty 'tests.rest.suite', 'step3'
+  }
+
+  tasks.register("${baseName}#Step4NewClusterTest", RestTestRunnerTask) {
+    useCluster testClusters."${newClusterName}"
+    dependsOn "${baseName}#Step3OldClusterTest"
+    systemProperty 'tests.rest.suite', 'step4'
+  }
+
+  tasks.matching { it.name.startsWith(baseName) && it.name.endsWith("ClusterTest") }.configureEach {
+    it.systemProperty 'tests.old_cluster_version', bwcVersion.toString().minus("-SNAPSHOT")
+    it.systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
+    def clusterName = it.name.contains("Step2") || it.name.contains("Step4") ? "${newClusterName}" : "${oldClusterName}"
+    it.nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${clusterName}".allHttpSocketURI.join(",")}")
+    it.nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${clusterName}".getName()}")
+  }
+
+  if (project.bwc_tests_enabled) {
+    bwcTest.dependsOn(
+      tasks.register("${baseName}#bwcTest") {
+        dependsOn tasks.named("${baseName}#Step4NewClusterTest")
+      }
+    )
+  }
+}
+
+task bwcTestSnapshots {
+  if (project.bwc_tests_enabled) {
+    for (final def version : bwcVersions.unreleasedIndexCompatible) {
+      dependsOn "v${version}#bwcTest"
+    }
+  }
+}
+
+check.dependsOn(bwcTestSnapshots)
+
+configurations {
+  testArtifacts.extendsFrom testRuntime
+}
+
+task testJar(type: Jar) {
+  appendix 'test'
+  from sourceSets.test.output
+}
+
+artifacts {
+  testArtifacts testJar
+}
+
+test.enabled = false

+ 269 - 0
qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java

@@ -0,0 +1,269 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.upgrades;
+
+import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
+import org.elasticsearch.client.Node;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.DeprecationHandler;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.snapshots.RestoreInfo;
+import org.elasticsearch.test.rest.ESRestTestCase;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests that verify that a snapshot repository is not getting corrupted and continues to function properly when accessed by multiple
+ * clusters of different versions. Concretely this test suite is simulating the following scenario:
+ * <ul>
+ *     <li>Start and run against a cluster in an old version: {@link TestStep#STEP1_OLD_CLUSTER}</li>
+ *     <li>Start and run against a cluster running the current version: {@link TestStep#STEP2_NEW_CLUSTER}</li>
+ *     <li>Run against the old version cluster from the first step: {@link TestStep#STEP3_OLD_CLUSTER}</li>
+ *     <li>Run against the current version cluster from the second step: {@link TestStep#STEP4_NEW_CLUSTER}</li>
+ * </ul>
+ * TODO: Add two more steps: delete all old version snapshots from the repository, then downgrade again and verify that the repository
+ *       is not being corrupted. This requires first merging the logic for reading the min_version field in RepositoryData back to 7.6.
+ */
+public class MultiVersionRepositoryAccessIT extends ESRestTestCase {
+
+    private enum TestStep {
+        STEP1_OLD_CLUSTER("step1"),
+        STEP2_NEW_CLUSTER("step2"),
+        STEP3_OLD_CLUSTER("step3"),
+        STEP4_NEW_CLUSTER("step4");
+
+        private final String name;
+
+        TestStep(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public static TestStep parse(String value) {
+            switch (value) {
+                case "step1":
+                    return STEP1_OLD_CLUSTER;
+                case "step2":
+                    return STEP2_NEW_CLUSTER;
+                case "step3":
+                    return STEP3_OLD_CLUSTER;
+                case "step4":
+                    return STEP4_NEW_CLUSTER;
+                default:
+                    throw new AssertionError("unknown test step: " + value);
+            }
+        }
+    }
+
+    protected static final TestStep TEST_STEP = TestStep.parse(System.getProperty("tests.rest.suite"));
+
+    @Override
+    protected boolean preserveSnapshotsUponCompletion() {
+        return true;
+    }
+
+    @Override
+    protected boolean preserveReposUponCompletion() {
+        return true;
+    }
+
+    public void testCreateAndRestoreSnapshot() throws IOException {
+        final String repoName = getTestName();
+        try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
+            final int shards = 3;
+            createIndex(client, "test-index", shards);
+            createRepository(client, repoName, false);
+            createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
+            final String snapshotToDeleteName = "snapshot-to-delete";
+            // Create a snapshot and delete it right away again to test the impact of each version's cleanup functionality that is run
+            // as part of the snapshot delete
+            createSnapshot(client, repoName, snapshotToDeleteName);
+            final List<Map<String, Object>> snapshotsIncludingToDelete = listSnapshots(repoName);
+            // Every step creates one snapshot and we have to add one more for the temporary snapshot
+            assertThat(snapshotsIncludingToDelete, hasSize(TEST_STEP.ordinal() + 1 + 1));
+            assertThat(snapshotsIncludingToDelete.stream().map(
+                sn -> (String) sn.get("snapshot")).collect(Collectors.toList()), hasItem(snapshotToDeleteName));
+            deleteSnapshot(client, repoName, snapshotToDeleteName);
+            final List<Map<String, Object>> snapshots = listSnapshots(repoName);
+            assertThat(snapshots, hasSize(TEST_STEP.ordinal() + 1));
+            assertSnapshotStatusSuccessful(client, repoName, snapshots);
+            if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
+                ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
+            } else if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) {
+                for (TestStep value : TestStep.values()) {
+                    ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + value, shards);
+                }
+            }
+        } finally {
+            deleteRepository(repoName);
+        }
+    }
+
+    public void testReadOnlyRepo() throws IOException {
+        final String repoName = getTestName();
+        try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
+            final int shards = 3;
+            final boolean readOnly = TEST_STEP.ordinal() > 1; // only restore from read-only repo in steps 3 and 4
+            createRepository(client, repoName, readOnly);
+            if (readOnly == false) {
+                createIndex(client, "test-index", shards);
+                createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
+            }
+            final List<Map<String, Object>> snapshots = listSnapshots(repoName);
+            switch (TEST_STEP) {
+                case STEP1_OLD_CLUSTER:
+                    assertThat(snapshots, hasSize(1));
+                    break;
+                case STEP2_NEW_CLUSTER:
+                case STEP4_NEW_CLUSTER:
+                case STEP3_OLD_CLUSTER:
+                    assertThat(snapshots, hasSize(2));
+                    break;
+            }
+            assertSnapshotStatusSuccessful(client, repoName, snapshots);
+            if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
+                ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
+            } else if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) {
+                ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
+                ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER, shards);
+            }
+        }
+    }
+
+    public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
+        if (TEST_STEP.ordinal() > 1) {
+            // Only testing the first 2 steps here
+            return;
+        }
+        final String repoName = getTestName();
+        try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
+            final int shards = 3;
+            createIndex(client, "test-index", shards);
+            createRepository(client, repoName, false);
+            createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
+            final List<Map<String, Object>> snapshots = listSnapshots(repoName);
+            // Every step creates one snapshot
+            assertThat(snapshots, hasSize(TEST_STEP.ordinal() + 1));
+            assertSnapshotStatusSuccessful(client, repoName, snapshots);
+            if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER) {
+                ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
+            } else {
+                deleteSnapshot(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER);
+                ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER, shards);
+                createSnapshot(client, repoName, "snapshot-1");
+                ensureSnapshotRestoreWorks(client, repoName, "snapshot-1", shards);
+                deleteSnapshot(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER);
+                createSnapshot(client, repoName, "snapshot-2");
+                ensureSnapshotRestoreWorks(client, repoName, "snapshot-2", shards);
+            }
+        } finally {
+            deleteRepository(repoName);
+        }
+    }
+
+    private static void assertSnapshotStatusSuccessful(RestHighLevelClient client, String repoName,
+                                                     List<Map<String, Object>> snapshots) throws IOException {
+        final SnapshotsStatusResponse statusResponse = client.snapshot().status(new SnapshotsStatusRequest(repoName,
+            snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new)), RequestOptions.DEFAULT);
+        for (SnapshotStatus status : statusResponse.getSnapshots()) {
+            assertThat(status.getShardsStats().getFailedShards(), is(0));
+        }
+    }
+
+    private void deleteSnapshot(RestHighLevelClient client, String repoName, String name) throws IOException {
+        assertThat(client.snapshot().delete(new DeleteSnapshotRequest(repoName, name), RequestOptions.DEFAULT).isAcknowledged(), is(true));
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<Map<String, Object>> listSnapshots(String repoName) throws IOException {
+        try (InputStream entity = client().performRequest(
+            new Request("GET", "/_snapshot/" + repoName + "/_all")).getEntity().getContent();
+             XContentParser parser = JsonXContent.jsonXContent.createParser(
+                 xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, entity)) {
+            final Map<String, Object> raw = parser.map();
+            // Bwc lookup since the format of the snapshots response changed between versions
+            if (raw.containsKey("snapshots")) {
+                return (List<Map<String, Object>>) raw.get("snapshots");
+            } else {
+                return (List<Map<String, Object>>) ((List<Map<?, ?>>) raw.get("responses")).get(0).get("snapshots");
+            }
+        }
+    }
+
+    private static void ensureSnapshotRestoreWorks(RestHighLevelClient client, String repoName, String name,
+                                                   int shards) throws IOException {
+        wipeAllIndices();
+        final RestoreInfo restoreInfo =
+            client.snapshot().restore(new RestoreSnapshotRequest().repository(repoName).snapshot(name).waitForCompletion(true),
+                RequestOptions.DEFAULT).getRestoreInfo();
+        assertThat(restoreInfo.failedShards(), is(0));
+        assertThat(restoreInfo.successfulShards(), equalTo(shards));
+    }
+
+    private static void createRepository(RestHighLevelClient client, String repoName, boolean readOnly) throws IOException {
+        assertThat(client.snapshot().createRepository(new PutRepositoryRequest(repoName).type("fs").settings(
+            Settings.builder().put("location", "./" + repoName).put("readonly", readOnly)), RequestOptions.DEFAULT).isAcknowledged(),
+            is(true));
+    }
+
+    private static void createSnapshot(RestHighLevelClient client, String repoName, String name) throws IOException {
+        client.snapshot().create(new CreateSnapshotRequest(repoName, name).waitForCompletion(true), RequestOptions.DEFAULT);
+    }
+
+    private void createIndex(RestHighLevelClient client, String name, int shards) throws IOException {
+        final Request putIndexRequest = new Request("PUT", "/" + name);
+        putIndexRequest.setJsonEntity("{\n" +
+            "    \"settings\" : {\n" +
+            "        \"index\" : {\n" +
+            "            \"number_of_shards\" : " + shards + ", \n" +
+            "            \"number_of_replicas\" : 0 \n" +
+            "        }\n" +
+            "    }\n" +
+            "}");
+        final Response response = client.getLowLevelClient().performRequest(putIndexRequest);
+        assertThat(response.getStatusLine().getStatusCode(), is(HttpURLConnection.HTTP_OK));
+    }
+}

+ 9 - 3
server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

@@ -77,6 +77,8 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
 
     private final RepositoriesService repositoriesService;
 
+    private final SnapshotsService snapshotsService;
+
     @Override
     protected String executor() {
         return ThreadPool.Names.GENERIC;
@@ -84,11 +86,13 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
 
     @Inject
     public TransportCleanupRepositoryAction(TransportService transportService, ClusterService clusterService,
-                                            RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters,
+                                            RepositoriesService repositoriesService, SnapshotsService snapshotsService,
+                                            ThreadPool threadPool, ActionFilters actionFilters,
                                             IndexNameExpressionResolver indexNameExpressionResolver) {
         super(CleanupRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters,
             CleanupRepositoryRequest::new, indexNameExpressionResolver);
         this.repositoriesService = repositoriesService;
+        this.snapshotsService = snapshotsService;
         // We add a state applier that will remove any dangling repository cleanup actions on master failover.
         // This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
         // operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
@@ -216,8 +220,10 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
                         threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
                             l -> blobStoreRepository.cleanup(
                                 repositoryStateId,
-                                newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
-                                ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
+                                newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)
+                                    && snapshotsService.hasOldVersionSnapshots(repositoryName, repositoryData, null) == false,
+                                ActionListener.wrap(result -> after(null, result), e -> after(e, null)))
+                        ));
                     }
 
                     private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {

+ 15 - 0
server/src/main/java/org/elasticsearch/repositories/RepositoryData.java

@@ -21,6 +21,7 @@ package org.elasticsearch.repositories;
 
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -28,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentParserUtils;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotState;
+import org.elasticsearch.snapshots.SnapshotsService;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -321,6 +323,7 @@ public final class RepositoryData {
     private static final String NAME = "name";
     private static final String UUID = "uuid";
     private static final String STATE = "state";
+    private static final String MIN_VERSION = "min_version";
 
     /**
      * Writes the snapshots metadata and the related indices metadata to x-content.
@@ -361,6 +364,12 @@ public final class RepositoryData {
             builder.endObject();
         }
         builder.endObject();
+        if (shouldWriteShardGens) {
+            // TODO: write this field once 7.6 is able to read it and add tests to :qa:snapshot-repository-downgrade that make sure older
+            //       ES versions can't corrupt the repository by writing to it and all the snapshots in it are v7.6 or newer
+            // Add min version field to make it impossible for older ES versions to deserialize this object
+            // builder.field(MIN_VERSION, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.toString());
+        }
         builder.endObject();
         return builder;
     }
@@ -468,6 +477,12 @@ public final class RepositoryData {
                             shardGenerations.put(indexId, i, gens.get(i));
                         }
                     }
+                } else if (MIN_VERSION.equals(field)) {
+                    if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
+                        throw new ElasticsearchParseException("version string expected [min_version]");
+                    }
+                    final Version version = Version.fromString(parser.text());
+                    assert version.onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION);
                 } else {
                     throw new ElasticsearchParseException("unknown field name  [" + field + "]");
                 }

+ 34 - 3
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -80,6 +80,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -270,6 +271,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
         repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener);
         repositoryDataListener.whenComplete(repositoryData -> {
+            final boolean hasOldFormatSnapshots = hasOldVersionSnapshots(repositoryName, repositoryData, null);
             clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
 
                 private SnapshotsInProgress.Entry newSnapshot = null;
@@ -303,7 +305,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                             repositoryData.getGenId(),
                             null,
                             request.userMetadata(),
-                            clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
+                            hasOldFormatSnapshots == false &&
+                                clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
                         initializingSnapshots.add(newSnapshot.snapshot());
                         snapshots = new SnapshotsInProgress(newSnapshot);
                     } else {
@@ -351,6 +354,31 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         }, listener::onFailure);
     }
 
+    public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repositoryData, @Nullable SnapshotId excluded) {
+        final Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
+        final boolean hasOldFormatSnapshots;
+        if (snapshotIds.isEmpty()) {
+            hasOldFormatSnapshots = false;
+        } else {
+            if (repositoryData.shardGenerations().totalShards() > 0) {
+                hasOldFormatSnapshots = false;
+            } else {
+                try {
+                    final Repository repository = repositoriesService.repository(repositoryName);
+                    hasOldFormatSnapshots = snapshotIds.stream().map(repository::getSnapshotInfo).anyMatch(
+                        snapshotInfo -> (excluded == null || snapshotInfo.snapshotId().equals(excluded) == false)
+                            && snapshotInfo.version().before(SHARD_GEN_IN_REPO_DATA_VERSION));
+                } catch (SnapshotMissingException e) {
+                    logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e);
+                    return true;
+                }
+            }
+        }
+        assert hasOldFormatSnapshots == false || repositoryData.shardGenerations().totalShards() == 0 :
+            "Found non-empty shard generations [" + repositoryData.shardGenerations() + "] but repository contained old version snapshots";
+        return hasOldFormatSnapshots;
+    }
+
     /**
      * Validates snapshot request
      *
@@ -1402,12 +1430,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                                               Version version) {
         threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
             Repository repository = repositoriesService.repository(snapshot.getRepository());
-            repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, version.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION),
+            repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshot(snapshot.getSnapshotId(),
+                repositoryStateId,
+                version.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION) &&
+                    hasOldVersionSnapshots(snapshot.getRepository(), repositoryData, snapshot.getSnapshotId()) == false,
                 ActionListener.wrap(v -> {
                         logger.info("snapshot [{}] deleted", snapshot);
                         removeSnapshotDeletionFromClusterState(snapshot, null, l);
                     }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)
-                ));
+                )), ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)));
         }));
     }
 

+ 47 - 0
server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.snapshots;
 
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.Client;
@@ -33,9 +34,11 @@ import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Locale;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.containsString;
@@ -231,6 +234,50 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
             .addSnapshots(snapshot).get().getSnapshots(repoName));
     }
 
+    public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {
+        Path repo = randomRepoPath();
+        final String repoName = "test-repo";
+        logger.info("-->  creating repository at {}", repo.toAbsolutePath());
+        assertAcked(client().admin().cluster().preparePutRepository(repoName)
+            .setType("fs").setSettings(Settings.builder()
+                .put("location", repo)
+                .put("compress", false)
+                .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
+
+        final String snapshotPrefix = "test-snap-";
+        final int snapshots = randomIntBetween(1, 2);
+        logger.info("--> creating [{}] snapshots", snapshots);
+        for (int i = 0; i < snapshots; ++i) {
+            // Workaround to simulate BwC situation: taking a snapshot without indices here so that we don't create any new version shard
+            // generations (the existence of which would short-circuit checks for the repo containing old version snapshots)
+            CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotPrefix + i)
+                .setIndices().setWaitForCompletion(true).get();
+            assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), is(0));
+            assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
+                equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
+        }
+        final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
+        final RepositoryData repositoryData = getRepositoryData(repository);
+
+        final SnapshotId snapshotToCorrupt = randomFrom(repositoryData.getSnapshotIds());
+        logger.info("--> delete root level snapshot metadata blob for snapshot [{}]", snapshotToCorrupt);
+        Files.delete(repo.resolve(String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotToCorrupt.getUUID())));
+
+        logger.info("--> verify that repo is assumed in old metadata format");
+        final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class);
+        final ThreadPool threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
+        assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
+            ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, repositoryData, null)))), is(true));
+
+        logger.info("--> verify that snapshot with missing root level metadata can be deleted");
+        assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get());
+
+        logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot");
+        assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
+            ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, getRepositoryData(repository), null)))),
+            is(false));
+    }
+
     private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
         logger.info("--> try to delete snapshot");
         final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,

+ 1 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -1276,7 +1276,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                         actionFilters, indexNameExpressionResolver
                     ));
                 actions.put(CleanupRepositoryAction.INSTANCE, new TransportCleanupRepositoryAction(transportService, clusterService,
-                    repositoriesService, threadPool, actionFilters, indexNameExpressionResolver));
+                    repositoriesService, snapshotsService, threadPool, actionFilters, indexNameExpressionResolver));
                 actions.put(CreateSnapshotAction.INSTANCE,
                     new TransportCreateSnapshotAction(
                         transportService, clusterService, threadPool,

+ 6 - 2
test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

@@ -620,13 +620,17 @@ public abstract class ESRestTestCase extends ESTestCase {
                 }
             }
             if (preserveReposUponCompletion() == false) {
-                logger.debug("wiping snapshot repository [{}]", repoName);
-                adminClient().performRequest(new Request("DELETE", "_snapshot/" + repoName));
+                deleteRepository(repoName);
             }
         }
         return inProgressSnapshots;
     }
 
+    protected void deleteRepository(String repoName) throws IOException {
+        logger.debug("wiping snapshot repository [{}]", repoName);
+        adminClient().performRequest(new Request("DELETE", "_snapshot/" + repoName));
+    }
+
     /**
      * Remove any cluster settings.
      */