Browse Source

Track Snapshot Version in RepositoryData (#50930)

Add tracking of snapshot versions to RepositoryData to make BwC logic more efficient.
Follow up to #50853
Armin Braun 5 years ago
parent
commit
e349c5eec0

+ 2 - 1
server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

@@ -332,7 +332,8 @@ public class AllocationService {
      * @param <T>       The list element type.
      * @return A comma-separated string of the first few elements.
      */
-    static <T> String firstListElementsToCommaDelimitedString(List<T> elements, Function<T, String> formatter, boolean isDebugEnabled) {
+    public static <T> String firstListElementsToCommaDelimitedString(List<T> elements, Function<T, String> formatter,
+                                                                     boolean isDebugEnabled) {
         final int maxNumberOfElements = 10;
         if (isDebugEnabled || elements.size() <= maxNumberOfElements) {
             return elements.stream().map(formatter).collect(Collectors.joining(", "));

+ 51 - 8
server/src/main/java/org/elasticsearch/repositories/RepositoryData.java

@@ -69,7 +69,7 @@ public final class RepositoryData {
      * An instance initialized for an empty repository.
      */
     public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN,
-        Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
+        Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
 
     /**
      * The generational id of the index file from which the repository data was read.
@@ -92,13 +92,16 @@ public final class RepositoryData {
      */
     private final Map<IndexId, Set<SnapshotId>> indexSnapshots;
 
+    private final Map<String, Version> snapshotVersions;
+
     /**
      * Shard generations.
      */
     private final ShardGenerations shardGenerations;
 
     public RepositoryData(long genId, Map<String, SnapshotId> snapshotIds, Map<String, SnapshotState> snapshotStates,
-                          Map<IndexId, Set<SnapshotId>> indexSnapshots, ShardGenerations shardGenerations) {
+                          Map<String, Version> snapshotVersions, Map<IndexId, Set<SnapshotId>> indexSnapshots,
+                          ShardGenerations shardGenerations) {
         this.genId = genId;
         this.snapshotIds = Collections.unmodifiableMap(snapshotIds);
         this.snapshotStates = Collections.unmodifiableMap(snapshotStates);
@@ -106,12 +109,27 @@ public final class RepositoryData {
             .collect(Collectors.toMap(IndexId::getName, Function.identity())));
         this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
         this.shardGenerations = Objects.requireNonNull(shardGenerations);
+        this.snapshotVersions = snapshotVersions;
         assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices "
             + shardGenerations.indices() + " but snapshots only reference indices " + indices.values();
     }
 
     protected RepositoryData copy() {
-        return new RepositoryData(genId, snapshotIds, snapshotStates, indexSnapshots, shardGenerations);
+        return new RepositoryData(genId, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations);
+    }
+
+    /**
+     * Creates a copy of this instance that contains updated version data.
+     * @param versions map of snapshot versions
+     * @return copy with updated version data
+     */
+    public RepositoryData withVersions(Map<SnapshotId, Version> versions) {
+        if (versions.isEmpty()) {
+            return this;
+        }
+        final Map<String, Version> newVersions = new HashMap<>(snapshotVersions);
+        versions.forEach((id, version) -> newVersions.put(id.getUUID(), version));
+        return new RepositoryData(genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations);
     }
 
     public ShardGenerations shardGenerations() {
@@ -141,6 +159,14 @@ public final class RepositoryData {
         return snapshotStates.get(snapshotId.getUUID());
     }
 
+    /**
+     * Returns the {@link Version} for the given snapshot or {@code null} if unknown.
+     */
+    @Nullable
+    public Version getVersion(SnapshotId snapshotId) {
+        return snapshotVersions.get(snapshotId.getUUID());
+    }
+
     /**
      * Returns an unmodifiable map of the index names to {@link IndexId} in the repository.
      */
@@ -173,6 +199,7 @@ public final class RepositoryData {
      */
     public RepositoryData addSnapshot(final SnapshotId snapshotId,
                                       final SnapshotState snapshotState,
+                                      final Version version,
                                       final ShardGenerations shardGenerations) {
         if (snapshotIds.containsKey(snapshotId.getUUID())) {
             // if the snapshot id already exists in the repository data, it means an old master
@@ -184,11 +211,13 @@ public final class RepositoryData {
         snapshots.put(snapshotId.getUUID(), snapshotId);
         Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
         newSnapshotStates.put(snapshotId.getUUID(), snapshotState);
+        Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
+        newSnapshotVersions.put(snapshotId.getUUID(), version);
         Map<IndexId, Set<SnapshotId>> allIndexSnapshots = new HashMap<>(indexSnapshots);
         for (final IndexId indexId : shardGenerations.indices()) {
             allIndexSnapshots.computeIfAbsent(indexId, k -> new LinkedHashSet<>()).add(snapshotId);
         }
-        return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots,
+        return new RepositoryData(genId, snapshots, newSnapshotStates, newSnapshotVersions, allIndexSnapshots,
             ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build());
     }
 
@@ -202,7 +231,7 @@ public final class RepositoryData {
         if (newGeneration == genId) {
             return this;
         }
-        return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots, this.shardGenerations);
+        return new RepositoryData(newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations);
     }
 
     /**
@@ -222,6 +251,8 @@ public final class RepositoryData {
         }
         Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
         newSnapshotStates.remove(snapshotId.getUUID());
+        final Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
+        newSnapshotVersions.remove(snapshotId.getUUID());
         Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
         for (final IndexId indexId : indices.values()) {
             Set<SnapshotId> set;
@@ -241,7 +272,7 @@ public final class RepositoryData {
             indexSnapshots.put(indexId, set);
         }
 
-        return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, indexSnapshots,
+        return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, newSnapshotVersions, indexSnapshots,
             ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations)
                 .retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build()
         );
@@ -269,6 +300,7 @@ public final class RepositoryData {
         RepositoryData that = (RepositoryData) obj;
         return snapshotIds.equals(that.snapshotIds)
                    && snapshotStates.equals(that.snapshotStates)
+                   && snapshotVersions.equals(that.snapshotVersions)
                    && indices.equals(that.indices)
                    && indexSnapshots.equals(that.indexSnapshots)
                    && shardGenerations.equals(that.shardGenerations);
@@ -276,7 +308,7 @@ public final class RepositoryData {
 
     @Override
     public int hashCode() {
-        return Objects.hash(snapshotIds, snapshotStates, indices, indexSnapshots, shardGenerations);
+        return Objects.hash(snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations);
     }
 
     /**
@@ -323,6 +355,7 @@ public final class RepositoryData {
     private static final String NAME = "name";
     private static final String UUID = "uuid";
     private static final String STATE = "state";
+    private static final String VERSION = "version";
     private static final String MIN_VERSION = "min_version";
 
     /**
@@ -339,6 +372,9 @@ public final class RepositoryData {
             if (snapshotStates.containsKey(snapshot.getUUID())) {
                 builder.field(STATE, snapshotStates.get(snapshot.getUUID()).value());
             }
+            if (snapshotVersions.containsKey(snapshot.getUUID())) {
+                builder.field(VERSION, snapshotVersions.get(snapshot.getUUID()).toString());
+            }
             builder.endObject();
         }
         builder.endArray();
@@ -378,6 +414,7 @@ public final class RepositoryData {
     public static RepositoryData snapshotsFromXContent(final XContentParser parser, long genId) throws IOException {
         final Map<String, SnapshotId> snapshots = new HashMap<>();
         final Map<String, SnapshotState> snapshotStates = new HashMap<>();
+        final Map<String, Version> snapshotVersions = new HashMap<>();
         final Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
         final ShardGenerations.Builder shardGenerations = ShardGenerations.builder();
 
@@ -390,6 +427,7 @@ public final class RepositoryData {
                             String name = null;
                             String uuid = null;
                             SnapshotState state = null;
+                            Version version = null;
                             while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
                                 String currentFieldName = parser.currentName();
                                 parser.nextToken();
@@ -399,12 +437,17 @@ public final class RepositoryData {
                                     uuid = parser.text();
                                 } else if (STATE.equals(currentFieldName)) {
                                     state = SnapshotState.fromValue(parser.numberValue().byteValue());
+                                } else if (VERSION.equals(currentFieldName)) {
+                                    version = Version.fromString(parser.text());
                                 }
                             }
                             final SnapshotId snapshotId = new SnapshotId(name, uuid);
                             if (state != null) {
                                 snapshotStates.put(uuid, state);
                             }
+                            if (version != null) {
+                                snapshotVersions.put(uuid, version);
+                            }
                             snapshots.put(snapshotId.getUUID(), snapshotId);
                         }
                     } else {
@@ -488,7 +531,7 @@ public final class RepositoryData {
         } else {
             throw new ElasticsearchParseException("start object expected");
         }
-        return new RepositoryData(genId, snapshots, snapshotStates, indexSnapshots, shardGenerations.build());
+        return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build());
     }
 
 }

+ 42 - 5
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -32,6 +32,7 @@ import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.RateLimiter;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.StepListener;
@@ -46,6 +47,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Numbers;
@@ -121,6 +123,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -873,7 +876,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
                 getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
                     final RepositoryData updatedRepositoryData =
-                        existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations);
+                        existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations);
                     writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(v -> {
                         if (writeShardGens) {
                             cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
@@ -1252,8 +1255,42 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 }
             });
 
+        final StepListener<RepositoryData> filterRepositoryDataStep = new StepListener<>();
+
         // Step 2: Write new index-N blob to repository and update index.latest
         setPendingStep.whenComplete(newGen -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
+            // BwC logic: Load snapshot version information if any snapshot is missing a version in RepositoryData so that the new
+            // RepositoryData contains a version for every snapshot
+            final List<SnapshotId> snapshotIdsWithoutVersion = repositoryData.getSnapshotIds().stream().filter(
+                snapshotId -> repositoryData.getVersion(snapshotId) == null).collect(Collectors.toList());
+            if (snapshotIdsWithoutVersion.isEmpty() == false) {
+                final Map<SnapshotId, Version> updatedVersionMap = new ConcurrentHashMap<>();
+                final GroupedActionListener<Void> loadAllVersionsListener = new GroupedActionListener<>(
+                    ActionListener.runAfter(
+                        new ActionListener<>() {
+                            @Override
+                            public void onResponse(Collection<Void> voids) {
+                                logger.info("Successfully loaded all snapshot's version information for {} from snapshot metadata",
+                                    AllocationService.firstListElementsToCommaDelimitedString(
+                                        snapshotIdsWithoutVersion, SnapshotId::toString, logger.isDebugEnabled()));
+                            }
+
+                            @Override
+                            public void onFailure(Exception e) {
+                                logger.warn("Failure when trying to load missing version information from snapshot metadata", e);
+                            }
+                        }, () -> filterRepositoryDataStep.onResponse(repositoryData.withVersions(updatedVersionMap))),
+                    snapshotIdsWithoutVersion.size());
+                for (SnapshotId snapshotId : snapshotIdsWithoutVersion) {
+                    threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(loadAllVersionsListener, () ->
+                        updatedVersionMap.put(snapshotId, getSnapshotInfo(snapshotId).version())));
+                }
+            } else {
+                filterRepositoryDataStep.onResponse(repositoryData);
+            }
+        })), listener::onFailure);
+        filterRepositoryDataStep.whenComplete(filteredRepositoryData -> {
+            final long newGen = setPendingStep.result();
             if (latestKnownRepoGen.get() >= newGen) {
                 throw new IllegalArgumentException(
                     "Tried writing generation [" + newGen + "] but repository is at least at generation [" + latestKnownRepoGen.get()
@@ -1263,7 +1300,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
             logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
             writeAtomic(indexBlob,
-                BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
+                BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
             // write the current generation to the index-latest file
             final BytesReference genBytes;
             try (BytesStreamOutput bStream = new BytesStreamOutput()) {
@@ -1297,13 +1334,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
                     @Override
                     public void onFailure(String source, Exception e) {
-                        l.onFailure(
+                        listener.onFailure(
                             new RepositoryException(metadata.name(), "Failed to execute cluster state update [" + source + "]", e));
                     }
 
                     @Override
                     public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
+                        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(listener, () -> {
                             // Delete all now outdated index files up to 1000 blobs back from the new generation.
                             // If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
                             // Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep
@@ -1320,7 +1357,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                         }));
                     }
                 });
-        })), listener::onFailure);
+        }, listener::onFailure);
     }
 
     private RepositoryMetaData getRepoMetaData(ClusterState state) {

+ 6 - 3
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -365,9 +365,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             } else {
                 try {
                     final Repository repository = repositoriesService.repository(repositoryName);
-                    hasOldFormatSnapshots = snapshotIds.stream().map(repository::getSnapshotInfo).anyMatch(
-                        snapshotInfo -> (excluded == null || snapshotInfo.snapshotId().equals(excluded) == false)
-                            && snapshotInfo.version().before(SHARD_GEN_IN_REPO_DATA_VERSION));
+                    hasOldFormatSnapshots = snapshotIds.stream().filter(snapshotId -> snapshotId.equals(excluded) == false).anyMatch(
+                        snapshotId -> {
+                            final Version known = repositoryData.getVersion(snapshotId);
+                            return (known == null ? repository.getSnapshotInfo(snapshotId).version() : known)
+                                .before(SHARD_GEN_IN_REPO_DATA_VERSION);
+                        });
                 } catch (SnapshotMissingException e) {
                     logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e);
                     return true;

+ 14 - 6
server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.repositories;
 
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.xcontent.XContent;
@@ -105,7 +106,8 @@ public class RepositoryDataTests extends ESTestCase {
             builder.put(indexId, 0, "2");
         }
         RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot,
-            randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), builder.build());
+            randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED),
+            randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), builder.build());
         // verify that the new repository data has the new snapshot and its indices
         assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot));
         for (IndexId indexId : indices) {
@@ -122,17 +124,19 @@ public class RepositoryDataTests extends ESTestCase {
         final int numSnapshots = randomIntBetween(1, 30);
         final Map<String, SnapshotId> snapshotIds = new HashMap<>(numSnapshots);
         final Map<String, SnapshotState> snapshotStates = new HashMap<>(numSnapshots);
+        final Map<String, Version> snapshotVersions = new HashMap<>(numSnapshots);
         for (int i = 0; i < numSnapshots; i++) {
             final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID());
             snapshotIds.put(snapshotId.getUUID(), snapshotId);
             snapshotStates.put(snapshotId.getUUID(), randomFrom(SnapshotState.values()));
+            snapshotVersions.put(snapshotId.getUUID(), randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()));
         }
         RepositoryData repositoryData = new RepositoryData(EMPTY_REPO_GEN, snapshotIds,
-            Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
+            Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
         // test that initializing indices works
         Map<IndexId, Set<SnapshotId>> indices = randomIndices(snapshotIds);
         RepositoryData newRepoData =
-            new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, indices, ShardGenerations.EMPTY);
+            new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, snapshotVersions, indices, ShardGenerations.EMPTY);
         List<SnapshotId> expected = new ArrayList<>(repositoryData.getSnapshotIds());
         Collections.sort(expected);
         List<SnapshotId> actual = new ArrayList<>(newRepoData.getSnapshotIds());
@@ -168,7 +172,8 @@ public class RepositoryDataTests extends ESTestCase {
     public void testGetSnapshotState() {
         final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID());
         final SnapshotState state = randomFrom(SnapshotState.values());
-        final RepositoryData repositoryData = RepositoryData.EMPTY.addSnapshot(snapshotId, state, ShardGenerations.EMPTY);
+        final RepositoryData repositoryData = RepositoryData.EMPTY.addSnapshot(snapshotId, state,
+            randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), ShardGenerations.EMPTY);
         assertEquals(state, repositoryData.getSnapshotState(snapshotId));
         assertNull(repositoryData.getSnapshotState(new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID())));
     }
@@ -187,9 +192,11 @@ public class RepositoryDataTests extends ESTestCase {
 
         Map<String, SnapshotId> snapshotIds = new HashMap<>();
         Map<String, SnapshotState> snapshotStates = new HashMap<>();
+        Map<String, Version> snapshotVersions = new HashMap<>();
         for (SnapshotId snapshotId : parsedRepositoryData.getSnapshotIds()) {
             snapshotIds.put(snapshotId.getUUID(), snapshotId);
             snapshotStates.put(snapshotId.getUUID(), parsedRepositoryData.getSnapshotState(snapshotId));
+            snapshotVersions.put(snapshotId.getUUID(), parsedRepositoryData.getVersion(snapshotId));
         }
 
         final IndexId corruptedIndexId = randomFrom(parsedRepositoryData.getIndices().values());
@@ -211,7 +218,7 @@ public class RepositoryDataTests extends ESTestCase {
         assertNotNull(corruptedIndexId);
 
         RepositoryData corruptedRepositoryData = new RepositoryData(parsedRepositoryData.getGenId(), snapshotIds, snapshotStates,
-            indexSnapshots, shardGenBuilder.build());
+            snapshotVersions, indexSnapshots, shardGenBuilder.build());
 
         final XContentBuilder corruptedBuilder = XContentBuilder.builder(xContent);
         corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, true);
@@ -280,7 +287,8 @@ public class RepositoryDataTests extends ESTestCase {
                     builder.put(someIndex, j, uuid);
                 }
             }
-            repositoryData = repositoryData.addSnapshot(snapshotId, randomFrom(SnapshotState.values()), builder.build());
+            repositoryData = repositoryData.addSnapshot(snapshotId, randomFrom(SnapshotState.values()),
+                randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), builder.build());
         }
         return repositoryData;
     }

+ 2 - 1
server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.repositories.blobstore;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -259,7 +260,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
                 builder.put(new IndexId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()), 0, "1");
             }
             repoData = repoData.addSnapshot(snapshotId,
-                randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), builder.build());
+                randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), Version.CURRENT, builder.build());
         }
         return repoData;
     }

+ 26 - 1
server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.snapshots;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.support.PlainActionFuture;
@@ -27,18 +28,25 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
+import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
 import java.util.Locale;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.containsString;
@@ -263,11 +271,24 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
         logger.info("--> delete root level snapshot metadata blob for snapshot [{}]", snapshotToCorrupt);
         Files.delete(repo.resolve(String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotToCorrupt.getUUID())));
 
+        logger.info("--> strip version information from index-N blob");
+        final RepositoryData withoutVersions = new RepositoryData(repositoryData.getGenId(),
+            repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(
+                SnapshotId::getUUID, Function.identity())),
+            repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(
+                SnapshotId::getUUID, repositoryData::getSnapshotState)),
+            Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
+
+        Files.write(repo.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + withoutVersions.getGenId()),
+            BytesReference.toBytes(BytesReference.bytes(withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(),
+                true))), StandardOpenOption.TRUNCATE_EXISTING);
+
         logger.info("--> verify that repo is assumed in old metadata format");
         final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class);
         final ThreadPool threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
         assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
-            ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, repositoryData, null)))), is(true));
+            ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, getRepositoryData(repository), null)))),
+            is(true));
 
         logger.info("--> verify that snapshot with missing root level metadata can be deleted");
         assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get());
@@ -276,6 +297,10 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
         assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
             ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, getRepositoryData(repository), null)))),
             is(false));
+        final RepositoryData finalRepositoryData = getRepositoryData(repository);
+        for (SnapshotId snapshotId : finalRepositoryData.getSnapshotIds()) {
+            assertThat(finalRepositoryData.getVersion(snapshotId), is(Version.CURRENT));
+        }
     }
 
     private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

@@ -88,7 +88,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
     @Override
     public void getRepositoryData(ActionListener<RepositoryData> listener) {
         final IndexId indexId = new IndexId(indexName, "blah");
-        listener.onResponse(new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(),
+        listener.onResponse(new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
             Collections.singletonMap(indexId, emptySet()), ShardGenerations.EMPTY));
     }
 

+ 4 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@@ -231,6 +232,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
 
             Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
             Map<String, SnapshotState> snapshotStates = new HashMap<>(copiedSnapshotIds.size());
+            Map<String, Version> snapshotVersions = new HashMap<>(copiedSnapshotIds.size());
             Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>(copiedSnapshotIds.size());
 
             ImmutableOpenMap<String, IndexMetaData> remoteIndices = remoteMetaData.getIndices();
@@ -239,10 +241,11 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                 SnapshotId snapshotId = new SnapshotId(LATEST, LATEST);
                 copiedSnapshotIds.put(indexName, snapshotId);
                 snapshotStates.put(indexName, SnapshotState.SUCCESS);
+                snapshotVersions.put(indexName, Version.CURRENT);
                 Index index = remoteIndices.get(indexName).getIndex();
                 indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singleton(snapshotId));
             }
-            return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots, ShardGenerations.EMPTY);
+            return new RepositoryData(1, copiedSnapshotIds, snapshotStates, snapshotVersions, indexSnapshots, ShardGenerations.EMPTY);
         });
     }