Browse Source

Refactor Snapshot Finalization Method (#76005)

This refactors the signature of snapshot finalization. For one it allows removing
a TODO about being dependent on mutable `SnapshotInfo` which was not great but
more importantly this sets up a follow-up where state can be shared between the
cluster state update at the end of finalization and subsequent old-shard-generation
cleanup so that we can resolve another open TODO about leaking shard generation files
in some cases.
Armin Braun 4 years ago
parent
commit
da668f9cb0
16 changed files with 230 additions and 203 deletions
  1. 12 12
      plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java
  2. 3 25
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java
  3. 2 19
      server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
  4. 89 0
      server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java
  5. 2 17
      server/src/main/java/org/elasticsearch/repositories/Repository.java
  6. 13 23
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
  7. 2 2
      server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java
  8. 16 15
      server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
  9. 2 11
      server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
  10. 24 22
      server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
  11. 3 6
      test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
  12. 4 2
      test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
  13. 2 4
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
  14. 15 12
      x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java
  15. 7 5
      x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java
  16. 34 28
      x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java

+ 12 - 12
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

@@ -13,8 +13,6 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
@@ -28,12 +26,11 @@ import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.monitor.jvm.JvmInfo;
+import org.elasticsearch.repositories.FinalizeSnapshotContext;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
-import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
 import org.elasticsearch.snapshots.SnapshotId;
-import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.snapshots.SnapshotsService;
 import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -239,15 +236,18 @@ class S3Repository extends MeteredBlobStoreRepository {
     private final AtomicReference<Scheduler.Cancellable> finalizationFuture = new AtomicReference<>();
 
     @Override
-    public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata clusterMetadata,
-                                 SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
-                                 Function<ClusterState, ClusterState> stateTransformer,
-                                 ActionListener<RepositoryData> listener) {
-        if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
-            listener = delayedListener(listener);
+    public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
+        if (SnapshotsService.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) {
+            finalizeSnapshotContext = new FinalizeSnapshotContext(
+                finalizeSnapshotContext.updatedShardGenerations(),
+                finalizeSnapshotContext.repositoryStateId(),
+                finalizeSnapshotContext.clusterMetadata(),
+                finalizeSnapshotContext.snapshotInfo(),
+                finalizeSnapshotContext.repositoryMetaVersion(),
+                delayedListener(finalizeSnapshotContext)
+            );
         }
-        super.finalizeSnapshot(shardGenerations, repositoryStateId, clusterMetadata, snapshotInfo, repositoryMetaVersion,
-            stateTransformer, listener);
+        super.finalizeSnapshot(finalizeSnapshotContext);
     }
 
     @Override

+ 3 - 25
server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java

@@ -7,10 +7,6 @@
  */
 package org.elasticsearch.snapshots;
 
-import org.elasticsearch.Version;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
@@ -19,9 +15,8 @@ import org.elasticsearch.env.Environment;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.RepositoryPlugin;
+import org.elasticsearch.repositories.FinalizeSnapshotContext;
 import org.elasticsearch.repositories.Repository;
-import org.elasticsearch.repositories.RepositoryData;
-import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.fs.FsRepository;
 import org.elasticsearch.test.ESIntegTestCase;
@@ -29,7 +24,6 @@ import org.elasticsearch.test.ESIntegTestCase;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
-import java.util.function.Function;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.is;
@@ -89,24 +83,8 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase {
                     private final String initialMetaValue = metadata.settings().get(MASTER_SETTING_VALUE);
 
                     @Override
-                    public void finalizeSnapshot(
-                        ShardGenerations shardGenerations,
-                        long repositoryStateId,
-                        Metadata clusterMetadata,
-                        SnapshotInfo snapshotInfo,
-                        Version repositoryMetaVersion,
-                        Function<ClusterState, ClusterState> stateTransformer,
-                        ActionListener<RepositoryData> listener
-                    ) {
-                        super.finalizeSnapshot(
-                            shardGenerations,
-                            repositoryStateId,
-                            clusterMetadata,
-                            snapshotInfo,
-                            repositoryMetaVersion,
-                            stateTransformer,
-                            listener
-                        );
+                    public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
+                        super.finalizeSnapshot(finalizeSnapshotContext);
                     }
 
                     @Override

+ 2 - 19
server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

@@ -23,7 +23,6 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.snapshots.SnapshotId;
-import org.elasticsearch.snapshots.SnapshotInfo;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -69,24 +68,8 @@ public class FilterRepository implements Repository {
     }
 
     @Override
-    public void finalizeSnapshot(
-        ShardGenerations shardGenerations,
-        long repositoryStateId,
-        Metadata clusterMetadata,
-        SnapshotInfo snapshotInfo,
-        Version repositoryMetaVersion,
-        Function<ClusterState, ClusterState> stateTransformer,
-        ActionListener<RepositoryData> listener
-    ) {
-        in.finalizeSnapshot(
-            shardGenerations,
-            repositoryStateId,
-            clusterMetadata,
-            snapshotInfo,
-            repositoryMetaVersion,
-            stateTransformer,
-            listener
-        );
+    public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
+        in.finalizeSnapshot(finalizeSnapshotContext);
     }
 
     @Override

+ 89 - 0
server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java

@@ -0,0 +1,89 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.repositories;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.snapshots.SnapshotInfo;
+import org.elasticsearch.snapshots.SnapshotsService;
+
+/**
+ * Context for finalizing a snapshot.
+ */
+public final class FinalizeSnapshotContext extends ActionListener.Delegating<
+    Tuple<RepositoryData, SnapshotInfo>,
+    Tuple<RepositoryData, SnapshotInfo>> {
+
+    private final ShardGenerations updatedShardGenerations;
+
+    private final long repositoryStateId;
+
+    private final Metadata clusterMetadata;
+
+    private final SnapshotInfo snapshotInfo;
+
+    private final Version repositoryMetaVersion;
+
+    /**
+     * @param updatedShardGenerations updated shard generations
+     * @param repositoryStateId       the unique id identifying the state of the repository when the snapshot began
+     * @param clusterMetadata         cluster metadata
+     * @param snapshotInfo            SnapshotInfo instance to write for this snapshot
+     * @param repositoryMetaVersion   version of the updated repository metadata to write
+     * @param listener                listener to be invoked with the new {@link RepositoryData} and {@link SnapshotInfo} after completing
+     *                                the snapshot
+     */
+    public FinalizeSnapshotContext(
+        ShardGenerations updatedShardGenerations,
+        long repositoryStateId,
+        Metadata clusterMetadata,
+        SnapshotInfo snapshotInfo,
+        Version repositoryMetaVersion,
+        ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener
+    ) {
+        super(listener);
+        this.updatedShardGenerations = updatedShardGenerations;
+        this.repositoryStateId = repositoryStateId;
+        this.clusterMetadata = clusterMetadata;
+        this.snapshotInfo = snapshotInfo;
+        this.repositoryMetaVersion = repositoryMetaVersion;
+    }
+
+    public long repositoryStateId() {
+        return repositoryStateId;
+    }
+
+    public ShardGenerations updatedShardGenerations() {
+        return updatedShardGenerations;
+    }
+
+    public SnapshotInfo snapshotInfo() {
+        return snapshotInfo;
+    }
+
+    public Version repositoryMetaVersion() {
+        return repositoryMetaVersion;
+    }
+
+    public Metadata clusterMetadata() {
+        return clusterMetadata;
+    }
+
+    public ClusterState updatedClusterState(ClusterState state) {
+        return SnapshotsService.stateWithoutSuccessfulSnapshot(state, snapshotInfo.snapshot());
+    }
+
+    @Override
+    public void onResponse(Tuple<RepositoryData, SnapshotInfo> repositoryData) {
+        delegate.onResponse(repositoryData);
+    }
+}

+ 2 - 17
server/src/main/java/org/elasticsearch/repositories/Repository.java

@@ -133,24 +133,9 @@ public interface Repository extends LifecycleComponent {
      * <p>
      * This method is called on master after all shards are snapshotted.
      *
-     * @param shardGenerations      updated shard generations
-     * @param repositoryStateId     the unique id identifying the state of the repository when the snapshot began
-     * @param clusterMetadata       cluster metadata
-     * @param snapshotInfo     SnapshotInfo instance to write for this snapshot
-     * @param repositoryMetaVersion version of the updated repository metadata to write
-     * @param stateTransformer      a function that filters the last cluster state update that the snapshot finalization will execute and
-     *                              is used to remove any state tracked for the in-progress snapshot from the cluster state
-     * @param listener              listener to be invoked with the new {@link RepositoryData} after completing the snapshot
+     * @param finalizeSnapshotContext finalization context
      */
-    void finalizeSnapshot(
-        ShardGenerations shardGenerations,
-        long repositoryStateId,
-        Metadata clusterMetadata,
-        SnapshotInfo snapshotInfo,
-        Version repositoryMetaVersion,
-        Function<ClusterState, ClusterState> stateTransformer,
-        ActionListener<RepositoryData> listener
-    );
+    void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext);
 
     /**
      * Deletes snapshots

+ 13 - 23
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -91,6 +91,7 @@ import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.store.StoreFileMetadata;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.indices.recovery.RecoveryState;
+import org.elasticsearch.repositories.FinalizeSnapshotContext;
 import org.elasticsearch.repositories.GetSnapshotInfoContext;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.IndexMetaDataGenerations;
@@ -1336,15 +1337,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     }
 
     @Override
-    public void finalizeSnapshot(
-        final ShardGenerations shardGenerations,
-        final long repositoryStateId,
-        final Metadata clusterMetadata,
-        SnapshotInfo snapshotInfo,
-        Version repositoryMetaVersion,
-        Function<ClusterState, ClusterState> stateTransformer,
-        final ActionListener<RepositoryData> listener
-    ) {
+    public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
+        final long repositoryStateId = finalizeSnapshotContext.repositoryStateId();
+        final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations();
+        final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
         assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN
             : "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]";
         final Collection<IndexId> indices = shardGenerations.indices();
@@ -1353,8 +1349,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION
         // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened
         // when writing the index-${N} to each shard directory.
+        final Version repositoryMetaVersion = finalizeSnapshotContext.repositoryMetaVersion();
         final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
-        final Consumer<Exception> onUpdateFailure = e -> listener.onFailure(
+        final Consumer<Exception> onUpdateFailure = e -> finalizeSnapshotContext.onFailure(
             new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e)
         );
 
@@ -1367,7 +1364,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         repoDataListener.whenComplete(existingRepositoryData -> {
             final int existingSnapshotCount = existingRepositoryData.getSnapshotIds().size();
             if (existingSnapshotCount >= maxSnapshotCount) {
-                listener.onFailure(
+                finalizeSnapshotContext.onFailure(
                     new RepositoryException(
                         metadata.name(),
                         "Cannot add another snapshot to this repository as it "
@@ -1398,23 +1395,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     snapshotInfo.startTime(),
                     snapshotInfo.endTime()
                 );
-                final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot(
-                    snapshotId,
-                    snapshotDetails,
-                    shardGenerations,
-                    indexMetas,
-                    indexMetaIdentifiers
-                );
                 writeIndexGen(
-                    updatedRepositoryData,
+                    existingRepositoryData.addSnapshot(snapshotId, snapshotDetails, shardGenerations, indexMetas, indexMetaIdentifiers),
                     repositoryStateId,
                     repositoryMetaVersion,
-                    stateTransformer,
+                    finalizeSnapshotContext::updatedClusterState,
                     ActionListener.wrap(newRepoData -> {
                         if (writeShardGens) {
-                            cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
+                            cleanupOldShardGens(existingRepositoryData, newRepoData);
                         }
-                        listener.onResponse(newRepoData);
+                        finalizeSnapshotContext.onResponse(Tuple.tuple(newRepoData, snapshotInfo));
                     }, onUpdateFailure)
                 );
             }, onUpdateFailure), 2 + indices.size());
@@ -1424,7 +1414,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             // index or global metadata will be compatible with the segments written in this snapshot as well.
             // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
             // that decrements the generation it points at
-
+            final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata();
             // Write Global MetaData
             executor.execute(
                 ActionRunnable.run(

+ 2 - 2
server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java

@@ -422,7 +422,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContentF
         );
     }
 
-    SnapshotInfo(
+    public SnapshotInfo(
         Snapshot snapshot,
         List<String> indices,
         List<String> dataStreams,
@@ -452,7 +452,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContentF
         this.successfulShards = successfulShards;
         this.shardFailures = List.copyOf(shardFailures);
         this.includeGlobalState = includeGlobalState;
-        this.userMetadata = userMetadata;
+        this.userMetadata = userMetadata == null ? null : Map.copyOf(userMetadata);
         this.indexSnapshotDetails = Map.copyOf(indexSnapshotDetails);
     }
 

+ 16 - 15
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -72,6 +72,7 @@ import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.AssociatedIndexDescriptor;
 import org.elasticsearch.indices.SystemIndices;
+import org.elasticsearch.repositories.FinalizeSnapshotContext;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
@@ -1533,24 +1534,24 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     entry.partial() ? shardGenerations.totalShards() : entry.shardsByRepoShardId().size(),
                     shardFailures,
                     entry.includeGlobalState(),
-                    // TODO: remove this hack making the metadata mutable once
-                    // https://github.com/elastic/elasticsearch/pull/72776 has been merged
-                    entry.userMetadata() == null ? null : new HashMap<>(entry.userMetadata()),
+                    entry.userMetadata(),
                     entry.startTime(),
                     indexSnapshotDetails
                 );
                 repo.finalizeSnapshot(
-                    shardGenerations,
-                    repositoryData.getGenId(),
-                    metaForSnapshot,
-                    snapshotInfo,
-                    entry.version(),
-                    state -> stateWithoutSuccessfulSnapshot(state, snapshot),
-                    ActionListener.wrap(newRepoData -> {
-                        completeListenersIgnoringException(endAndGetListenersToResolve(snapshot), Tuple.tuple(newRepoData, snapshotInfo));
-                        logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
-                        runNextQueuedOperation(newRepoData, repository, true);
-                    }, e -> handleFinalizationFailure(e, snapshot, repositoryData))
+                    new FinalizeSnapshotContext(
+                        shardGenerations,
+                        repositoryData.getGenId(),
+                        metaForSnapshot,
+                        snapshotInfo,
+                        entry.version(),
+                        ActionListener.wrap(result -> {
+                            final SnapshotInfo writtenSnapshotInfo = result.v2();
+                            completeListenersIgnoringException(endAndGetListenersToResolve(writtenSnapshotInfo.snapshot()), result);
+                            logger.info("snapshot [{}] completed with state [{}]", snapshot, writtenSnapshotInfo.state());
+                            runNextQueuedOperation(result.v1(), repository, true);
+                        }, e -> handleFinalizationFailure(e, snapshot, repositoryData))
+                    )
                 );
             }, e -> handleFinalizationFailure(e, snapshot, repositoryData));
         } catch (Exception e) {
@@ -1748,7 +1749,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * @param snapshot snapshot for which to remove the snapshot operation
      * @return updated cluster state
      */
-    private static ClusterState stateWithoutSuccessfulSnapshot(ClusterState state, Snapshot snapshot) {
+    public static ClusterState stateWithoutSuccessfulSnapshot(ClusterState state, Snapshot snapshot) {
         // TODO: updating snapshots here leaks their outdated generation files, we should add logic to clean those up and enhance
         // BlobStoreTestUtil to catch this leak
         SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);

+ 2 - 11
server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

@@ -38,7 +38,6 @@ import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
 import org.elasticsearch.snapshots.SnapshotId;
-import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
@@ -222,16 +221,8 @@ public class RepositoriesServiceTests extends ESTestCase {
         }
 
         @Override
-        public void finalizeSnapshot(
-            ShardGenerations shardGenerations,
-            long repositoryStateId,
-            Metadata clusterMetadata,
-            SnapshotInfo snapshotInfo,
-            Version repositoryMetaVersion,
-            Function<ClusterState, ClusterState> stateTransformer,
-            ActionListener<RepositoryData> listener
-        ) {
-            listener.onResponse(null);
+        public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
+            finalizeSnapshotContext.onResponse(null);
         }
 
         @Override

+ 24 - 22
server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.MockBigArrays;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.TestEnvironment;
@@ -35,6 +36,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.store.StoreFileMetadata;
 import org.elasticsearch.indices.recovery.RecoverySettings;
+import org.elasticsearch.repositories.FinalizeSnapshotContext;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
@@ -51,7 +53,6 @@ import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.containsString;
@@ -172,28 +173,29 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
                 new SnapshotId(snapshot.getSnapshotId().getName(), "_uuid2")
             );
             final ShardGenerations shardGenerations = ShardGenerations.builder().put(indexId, 0, shardGen).build();
-            PlainActionFuture.<RepositoryData, Exception>get(
+            PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(
                 f -> repository.finalizeSnapshot(
-                    shardGenerations,
-                    RepositoryData.EMPTY_REPO_GEN,
-                    Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(),
-                    new SnapshotInfo(
-                        snapshot,
-                        shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        null,
-                        1L,
-                        6,
-                        Collections.emptyList(),
-                        true,
-                        Collections.emptyMap(),
-                        0L,
-                        Collections.emptyMap()
-                    ),
-                    Version.CURRENT,
-                    Function.identity(),
-                    f
+                    new FinalizeSnapshotContext(
+                        shardGenerations,
+                        RepositoryData.EMPTY_REPO_GEN,
+                        Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(),
+                        new SnapshotInfo(
+                            snapshot,
+                            shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
+                            Collections.emptyList(),
+                            Collections.emptyList(),
+                            null,
+                            1L,
+                            6,
+                            Collections.emptyList(),
+                            true,
+                            Collections.emptyMap(),
+                            0L,
+                            Collections.emptyMap()
+                        ),
+                        Version.CURRENT,
+                        f
+                    )
                 )
             );
             IndexShardSnapshotFailedException isfe = expectThrows(

+ 3 - 6
test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

@@ -17,6 +17,7 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
+import org.elasticsearch.repositories.FinalizeSnapshotContext;
 import org.elasticsearch.repositories.GetSnapshotInfoContext;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.IndexMetaDataGenerations;
@@ -28,7 +29,6 @@ import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.repositories.ShardSnapshotResult;
 import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.snapshots.SnapshotId;
-import org.elasticsearch.snapshots.SnapshotInfo;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -94,11 +94,8 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
     }
 
     @Override
-    public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId,
-                                 Metadata clusterMetadata, SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
-                                 Function<ClusterState, ClusterState> stateTransformer,
-                                 ActionListener<RepositoryData> listener) {
-        listener.onResponse(null);
+    public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
+        finalizeSnapshotContext.onResponse(null);
     }
 
     @Override

+ 4 - 2
test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -42,8 +42,10 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.node.NodeClosedException;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.repositories.FinalizeSnapshotContext;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
@@ -484,9 +486,9 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
             SnapshotState.FAILED,
             Collections.emptyMap()
         );
-        PlainActionFuture.<RepositoryData, Exception>get(f -> repo.finalizeSnapshot(
+        PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(f -> repo.finalizeSnapshot(new FinalizeSnapshotContext(
                 ShardGenerations.EMPTY, getRepositoryData(repoName).getGenId(), state.metadata(), snapshotInfo,
-                SnapshotsService.OLD_SNAPSHOT_FORMAT, Function.identity(), f));
+                SnapshotsService.OLD_SNAPSHOT_FORMAT, f)));
     }
 
     protected void awaitNDeletionsInProgress(int count) throws Exception {

+ 2 - 4
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -62,6 +62,7 @@ import org.elasticsearch.index.store.StoreFileMetadata;
 import org.elasticsearch.indices.recovery.MultiChunkTransfer;
 import org.elasticsearch.indices.recovery.MultiFileWriter;
 import org.elasticsearch.indices.recovery.RecoveryState;
+import org.elasticsearch.repositories.FinalizeSnapshotContext;
 import org.elasticsearch.repositories.GetSnapshotInfoContext;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.IndexMetaDataGenerations;
@@ -288,10 +289,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
     }
 
     @Override
-    public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata metadata,
-                                 SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
-                                 Function<ClusterState, ClusterState> stateTransformer,
-                                 ActionListener<RepositoryData> listener) {
+    public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
         throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
     }
 

+ 15 - 12
x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java

@@ -18,9 +18,6 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.FilterDirectory;
 import org.apache.lucene.store.NIOFSDirectory;
-import org.elasticsearch.Version;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -40,12 +37,10 @@ import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.TranslogStats;
 import org.elasticsearch.repositories.FilterRepository;
+import org.elasticsearch.repositories.FinalizeSnapshotContext;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
-import org.elasticsearch.repositories.RepositoryData;
-import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.repositories.SnapshotShardContext;
-import org.elasticsearch.snapshots.SnapshotInfo;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -90,15 +85,23 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
     }
 
     @Override
-    public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata metadata,
-                                 SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
-                                 Function<ClusterState, ClusterState> stateTransformer,
-                                 ActionListener<RepositoryData> listener) {
+    public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
         // we process the index metadata at snapshot time. This means if somebody tries to restore
         // a _source only snapshot with a plain repository it will be just fine since we already set the
         // required engine, that the index is read-only and the mapping to a default mapping
-        super.finalizeSnapshot(shardGenerations, repositoryStateId, metadataToSnapshot(shardGenerations.indices(), metadata),
-            snapshotInfo, repositoryMetaVersion, stateTransformer, listener);
+        super.finalizeSnapshot(
+            new FinalizeSnapshotContext(
+                finalizeSnapshotContext.updatedShardGenerations(),
+                finalizeSnapshotContext.repositoryStateId(),
+                metadataToSnapshot(
+                    finalizeSnapshotContext.updatedShardGenerations().indices(),
+                    finalizeSnapshotContext.clusterMetadata()
+                ),
+                finalizeSnapshotContext.snapshotInfo(),
+                finalizeSnapshotContext.repositoryMetaVersion(),
+                finalizeSnapshotContext
+            )
+        );
     }
 
     private static Metadata metadataToSnapshot(Collection<IndexId> indices, Metadata metadata) {

+ 7 - 5
x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java

@@ -41,6 +41,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.MockBigArrays;
 import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.TestEnvironment;
@@ -61,13 +62,14 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.indices.recovery.RecoveryState;
+import org.elasticsearch.repositories.FinalizeSnapshotContext;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.ShardGeneration;
 import org.elasticsearch.repositories.ShardGenerations;
-import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.ShardSnapshotResult;
+import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
 import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
 import org.elasticsearch.repositories.fs.FsRepository;
@@ -83,7 +85,6 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
@@ -226,10 +227,10 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
                 repository.snapshotShard(new SnapshotShardContext(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef,
                     null, indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future));
                 future.actionGet();
-                final PlainActionFuture<RepositoryData> finFuture = PlainActionFuture.newFuture();
+                final PlainActionFuture<Tuple<RepositoryData, SnapshotInfo>> finFuture = PlainActionFuture.newFuture();
                 final ShardGenerations shardGenerations =
                     ShardGenerations.builder().put(indexId, 0, indexShardSnapshotStatus.generation()).build();
-                repository.finalizeSnapshot(
+                repository.finalizeSnapshot(new FinalizeSnapshotContext(
                     shardGenerations,
                     ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(),
                     Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(),
@@ -247,7 +248,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
                         Collections.emptyMap(),
                         0L,
                         Collections.emptyMap()),
-                    Version.CURRENT, Function.identity(), finFuture);
+                    Version.CURRENT, finFuture
+                ));
                 finFuture.actionGet();
             });
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();

+ 34 - 28
x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java

@@ -10,10 +10,6 @@ package org.elasticsearch.repositories.encrypted;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.Version;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.CheckedSupplier;
@@ -39,10 +35,9 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.XPackLicenseState;
-import org.elasticsearch.repositories.RepositoryData;
+import org.elasticsearch.repositories.FinalizeSnapshotContext;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.RepositoryStats;
-import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 import org.elasticsearch.snapshots.SnapshotInfo;
@@ -203,34 +198,45 @@ public class EncryptedRepository extends BlobStoreRepository {
     }
 
     @Override
-    public void finalizeSnapshot(
-        ShardGenerations shardGenerations,
-        long repositoryStateId,
-        Metadata clusterMetadata,
-        SnapshotInfo snapshotInfo,
-        Version repositoryMetaVersion,
-        Function<ClusterState, ClusterState> stateTransformer,
-        ActionListener<RepositoryData> listener
-    ) {
+    public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
+        final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
         try {
             validateLocalRepositorySecret(snapshotInfo.userMetadata());
         } catch (RepositoryException passwordValidationException) {
-            listener.onFailure(passwordValidationException);
+            finalizeSnapshotContext.onFailure(passwordValidationException);
             return;
-        } finally {
-            // remove the repository password hash (and salt) from the snapshot metadata so that it is not displayed in the API response
-            // to the user
-            snapshotInfo.userMetadata().remove(PASSWORD_HASH_USER_METADATA_KEY);
-            snapshotInfo.userMetadata().remove(PASSWORD_SALT_USER_METADATA_KEY);
         }
+        // remove the repository password hash (and salt) from the snapshot metadata so that it is not displayed in the API response
+        // to the user
+        final Map<String, Object> updatedUserMetadata = new HashMap<>(snapshotInfo.userMetadata());
+        updatedUserMetadata.remove(PASSWORD_HASH_USER_METADATA_KEY);
+        updatedUserMetadata.remove(PASSWORD_SALT_USER_METADATA_KEY);
+        final SnapshotInfo updatedSnapshotInfo = new SnapshotInfo(
+            snapshotInfo.snapshot(),
+            snapshotInfo.indices(),
+            snapshotInfo.dataStreams(),
+            snapshotInfo.featureStates(),
+            snapshotInfo.reason(),
+            snapshotInfo.version(),
+            snapshotInfo.startTime(),
+            snapshotInfo.endTime(),
+            snapshotInfo.totalShards(),
+            snapshotInfo.successfulShards(),
+            snapshotInfo.shardFailures(),
+            snapshotInfo.includeGlobalState(),
+            updatedUserMetadata,
+            snapshotInfo.state(),
+            snapshotInfo.indexSnapshotDetails()
+        );
         super.finalizeSnapshot(
-            shardGenerations,
-            repositoryStateId,
-            clusterMetadata,
-            snapshotInfo,
-            repositoryMetaVersion,
-            stateTransformer,
-            listener
+            new FinalizeSnapshotContext(
+                finalizeSnapshotContext.updatedShardGenerations(),
+                finalizeSnapshotContext.repositoryStateId(),
+                finalizeSnapshotContext.clusterMetadata(),
+                updatedSnapshotInfo,
+                finalizeSnapshotContext.repositoryMetaVersion(),
+                finalizeSnapshotContext
+            )
         );
     }