Browse Source

Refactor Repository#snapshotShard (#72083)

Create a class for holding the large number of arguments to this method
and to dry up resource handling across snapshot shard service and the
source-only repository.
Armin Braun 4 years ago
parent
commit
ede947fdd8
16 changed files with 219 additions and 201 deletions
  1. 4 13
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java
  2. 2 9
      server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
  3. 8 31
      server/src/main/java/org/elasticsearch/repositories/Repository.java
  4. 120 0
      server/src/main/java/org/elasticsearch/repositories/SnapshotShardContext.java
  5. 16 14
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
  6. 4 4
      server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
  7. 1 6
      server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
  8. 8 4
      server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java
  9. 3 3
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  10. 2 8
      test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
  11. 2 6
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
  12. 11 11
      x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java
  13. 15 14
      x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java
  14. 5 33
      x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java
  15. 4 35
      x-pack/plugin/repository-encrypted/src/test/java/org/elasticsearch/repositories/encrypted/LocalStateEncryptedRepositoryPlugin.java
  16. 14 10
      x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java

+ 4 - 13
server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java

@@ -7,7 +7,6 @@
  */
 package org.elasticsearch.snapshots;
 
-import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
@@ -17,17 +16,13 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.env.Environment;
-import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
-import org.elasticsearch.index.store.Store;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.RepositoryPlugin;
-import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.ShardGenerations;
-import org.elasticsearch.repositories.ShardSnapshotResult;
+import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.fs.FsRepository;
 import org.elasticsearch.test.ESIntegTestCase;
 
@@ -88,13 +83,9 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase {
                     }
 
                     @Override
-                    public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                                              IndexCommit snapshotIndexCommit, String shardStateIdentifier,
-                                              IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
-                                              Map<String, Object> userMetadata, ActionListener<ShardSnapshotResult> listener) {
-                        assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue)));
-                        super.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, shardStateIdentifier,
-                            snapshotStatus, repositoryMetaVersion, userMetadata, listener);
+                    public void snapshotShard(SnapshotShardContext context) {
+                        assertThat(context.userMetadata(), is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue)));
+                        super.snapshotShard(context);
                     }
 
                     @Override

+ 2 - 9
server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

@@ -7,7 +7,6 @@
  */
 package org.elasticsearch.repositories;
 
-import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
@@ -18,7 +17,6 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.component.LifecycleListener;
-import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.index.store.Store;
@@ -28,7 +26,6 @@ import org.elasticsearch.snapshots.SnapshotInfo;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Map;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -114,12 +111,8 @@ public class FilterRepository implements Repository {
     }
 
     @Override
-    public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
-                              Version repositoryMetaVersion, Map<String, Object> userMetadata,
-                              ActionListener<ShardSnapshotResult> listener) {
-        in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, shardStateIdentifier, snapshotStatus,
-            repositoryMetaVersion, userMetadata, listener);
+    public void snapshotShard(SnapshotShardContext context) {
+        in.snapshotShard(context);
     }
     @Override
     public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,

+ 8 - 31
server/src/main/java/org/elasticsearch/repositories/Repository.java

@@ -7,7 +7,6 @@
  */
 package org.elasticsearch.repositories;
 
-import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
@@ -19,7 +18,6 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.component.LifecycleComponent;
-import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.index.store.Store;
@@ -182,37 +180,16 @@ public interface Repository extends LifecycleComponent {
     boolean isReadOnly();
 
     /**
-     * Creates a snapshot of the shard based on the index commit point.
+     * Creates a snapshot of the shard referenced by the given {@link SnapshotShardContext}.
      * <p>
-     * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
-     * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
-     * <p>
-     * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
-     * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
-     * @param store                 store to be snapshotted
-     * @param mapperService         the shards mapper service
-     * @param snapshotId            snapshot id
-     * @param indexId               id for the index being snapshotted
-     * @param snapshotIndexCommit   commit point
-     * @param shardStateIdentifier  a unique identifier of the state of the shard that is stored with the shard's snapshot and used
-     *                              to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier
-     *                              snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit}
-     * @param snapshotStatus        snapshot status
-     * @param repositoryMetaVersion version of the updated repository metadata to write
-     * @param userMetadata          user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()}
-     * @param listener              listener invoked on completion
+     * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object returned by
+     * {@link SnapshotShardContext#status()} and check its {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process
+     * should be aborted.
+     *
+     * @param snapshotShardContext snapshot shard context that must be completed via {@link SnapshotShardContext#onResponse} or
+     *                             {@link SnapshotShardContext#onFailure}
      */
-    void snapshotShard(
-            Store store,
-            MapperService mapperService,
-            SnapshotId snapshotId,
-            IndexId indexId,
-            IndexCommit snapshotIndexCommit,
-            @Nullable String shardStateIdentifier,
-            IndexShardSnapshotStatus snapshotStatus,
-            Version repositoryMetaVersion,
-            Map<String, Object> userMetadata,
-            ActionListener<ShardSnapshotResult> listener);
+    void snapshotShard(SnapshotShardContext snapshotShardContext);
 
     /**
      * Restores snapshot of the shard.

+ 120 - 0
server/src/main/java/org/elasticsearch/repositories/SnapshotShardContext.java

@@ -0,0 +1,120 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.repositories;
+
+import org.apache.lucene.index.IndexCommit;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.snapshots.SnapshotId;
+
+import java.util.Map;
+
+/**
+ * Context holding the state for creating a shard snapshot via {@link Repository#snapshotShard(SnapshotShardContext)}.
+ * Wraps a {@link org.elasticsearch.index.engine.Engine.IndexCommitRef} that is released once this instances is completed by invoking
+ * either its {@link #onResponse(ShardSnapshotResult)} or {@link #onFailure(Exception)} callback.
+ */
+public final class SnapshotShardContext extends ActionListener.Delegating<ShardSnapshotResult, ShardSnapshotResult> {
+
+    private final Store store;
+    private final MapperService mapperService;
+    private final SnapshotId snapshotId;
+    private final IndexId indexId;
+    private final Engine.IndexCommitRef commitRef;
+    @Nullable
+    private final String shardStateIdentifier;
+    private final IndexShardSnapshotStatus snapshotStatus;
+    private final Version repositoryMetaVersion;
+    private final Map<String, Object> userMetadata;
+
+    /**
+     * @param store                 store to be snapshotted
+     * @param mapperService         the shards mapper service
+     * @param snapshotId            snapshot id
+     * @param indexId               id for the index being snapshotted
+     * @param commitRef             commit point reference
+     * @param shardStateIdentifier  a unique identifier of the state of the shard that is stored with the shard's snapshot and used
+     *                              to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier
+     *                              snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit}
+     * @param snapshotStatus        snapshot status
+     * @param repositoryMetaVersion version of the updated repository metadata to write
+     * @param userMetadata          user metadata of the snapshot found in
+     *                              {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#userMetadata()}
+     * @param listener              listener invoked on completion
+     */
+    public SnapshotShardContext(
+            Store store,
+            MapperService mapperService,
+            SnapshotId snapshotId,
+            IndexId indexId,
+            Engine.IndexCommitRef commitRef,
+            @Nullable String shardStateIdentifier,
+            IndexShardSnapshotStatus snapshotStatus,
+            Version repositoryMetaVersion,
+            Map<String, Object> userMetadata,
+            ActionListener<ShardSnapshotResult> listener) {
+        super(ActionListener.runBefore(listener, commitRef::close));
+        this.store = store;
+        this.mapperService = mapperService;
+        this.snapshotId = snapshotId;
+        this.indexId = indexId;
+        this.commitRef = commitRef;
+        this.shardStateIdentifier = shardStateIdentifier;
+        this.snapshotStatus = snapshotStatus;
+        this.repositoryMetaVersion = repositoryMetaVersion;
+        this.userMetadata = userMetadata;
+    }
+
+    public Store store() {
+        return store;
+    }
+
+    public MapperService mapperService() {
+        return mapperService;
+    }
+
+    public SnapshotId snapshotId() {
+        return snapshotId;
+    }
+
+    public IndexId indexId() {
+        return indexId;
+    }
+
+    public IndexCommit indexCommit() {
+        return commitRef.getIndexCommit();
+    }
+
+    @Nullable
+    public String stateIdentifier() {
+        return shardStateIdentifier;
+    }
+
+    public IndexShardSnapshotStatus status() {
+        return snapshotStatus;
+    }
+
+    public Version getRepositoryMetaVersion() {
+        return repositoryMetaVersion;
+    }
+
+    public Map<String, Object> userMetadata() {
+        return userMetadata;
+    }
+
+    @Override
+    public void onResponse(ShardSnapshotResult result) {
+        delegate.onResponse(result);
+    }
+}

+ 16 - 14
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -77,7 +77,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
@@ -104,6 +103,7 @@ import org.elasticsearch.repositories.RepositoryShardId;
 import org.elasticsearch.repositories.RepositoryStats;
 import org.elasticsearch.repositories.RepositoryVerificationException;
 import org.elasticsearch.repositories.ShardGenerations;
+import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.ShardSnapshotResult;
 import org.elasticsearch.snapshots.AbortedSnapshotException;
 import org.elasticsearch.snapshots.SnapshotException;
@@ -2101,20 +2101,21 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     }
 
     @Override
-    public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
-                              Version repositoryMetaVersion, Map<String, Object> userMetadata,
-                              ActionListener<ShardSnapshotResult> listener) {
+    public void snapshotShard(SnapshotShardContext context) {
         if (isReadOnly()) {
-            listener.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository"));
+            context.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository"));
             return;
         }
+        final Store store = context.store();
+        final IndexCommit snapshotIndexCommit = context.indexCommit();
         final ShardId shardId = store.shardId();
+        final SnapshotId snapshotId = context.snapshotId();
+        final IndexShardSnapshotStatus snapshotStatus = context.status();
         final long startTime = threadPool.absoluteTimeInMillis();
         try {
             final String generation = snapshotStatus.generation();
             logger.debug("[{}] [{}] snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
-            final BlobContainer shardContainer = shardContainer(indexId, shardId);
+            final BlobContainer shardContainer = shardContainer(context.indexId(), shardId);
             final Set<String> blobs;
             if (generation == null) {
                 try {
@@ -2135,7 +2136,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     "Duplicate snapshot name [" + snapshotId.getName() + "] detected, aborting");
             }
             // First inspect all known SegmentInfos instances to see if we already have an equivalent commit in the repository
-            final List<BlobStoreIndexShardSnapshot.FileInfo> filesFromSegmentInfos = Optional.ofNullable(shardStateIdentifier).map(id -> {
+            final List<BlobStoreIndexShardSnapshot.FileInfo> filesFromSegmentInfos =
+                    Optional.ofNullable(context.stateIdentifier()).map(id -> {
                 for (SnapshotFiles snapshotFileSet : snapshots.snapshots()) {
                     if (id.equals(snapshotFileSet.shardStateIdentifier())) {
                         return snapshotFileSet.indexFiles();
@@ -2223,10 +2225,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileSize);
 
             final String indexGeneration;
-            final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
+            final boolean writeShardGens = SnapshotsService.useShardGenerations(context.getRepositoryMetaVersion());
             // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
             List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
-            newSnapshotsList.add(new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, shardStateIdentifier));
+            newSnapshotsList.add(new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, context.stateIdentifier()));
             for (SnapshotFiles point : snapshots) {
                 newSnapshotsList.add(point);
             }
@@ -2304,8 +2306,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                         ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()),
                         snapshotIndexCommit.getSegmentCount());
                 snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
-                listener.onResponse(shardSnapshotResult);
-            }, listener::onFailure);
+                context.onResponse(shardSnapshotResult);
+            }, context::onFailure);
             if (indexIncrementalFileCount == 0) {
                 allFilesUploadedListener.onResponse(Collections.emptyList());
                 return;
@@ -2315,10 +2317,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount);
             final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, workers, allFilesUploadedListener);
             for (int i = 0; i < workers; ++i) {
-                executeOneFileSnapshot(store, snapshotId, indexId, snapshotStatus, filesToSnapshot, executor, filesListener);
+                executeOneFileSnapshot(store, snapshotId, context.indexId(), snapshotStatus, filesToSnapshot, executor, filesListener);
             }
         } catch (Exception e) {
-            listener.onFailure(e);
+            context.onFailure(e);
         }
     }
 

+ 4 - 4
server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

@@ -44,6 +44,7 @@ import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.ShardGenerations;
+import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.ShardSnapshotResult;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportException;
@@ -331,10 +332,9 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
             Engine.IndexCommitRef snapshotRef = null;
             try {
                 snapshotRef = indexShard.acquireIndexCommitForSnapshot();
-                final IndexCommit snapshotIndexCommit = snapshotRef.getIndexCommit();
-                repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId,
-                    snapshotRef.getIndexCommit(), getShardStateId(indexShard, snapshotIndexCommit), snapshotStatus, version, userMetadata,
-                    ActionListener.runBefore(listener, snapshotRef::close));
+                repository.snapshotShard(new SnapshotShardContext(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(),
+                    indexId, snapshotRef, getShardStateId(indexShard, snapshotRef.getIndexCommit()), snapshotStatus, version, userMetadata,
+                    listener));
             } catch (Exception e) {
                 IOUtils.close(snapshotRef);
                 throw e;

+ 1 - 6
server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

@@ -8,7 +8,6 @@
 
 package org.elasticsearch.repositories;
 
-import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
@@ -32,7 +31,6 @@ import org.elasticsearch.common.component.LifecycleListener;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.MockBigArrays;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.index.store.Store;
@@ -251,10 +249,7 @@ public class RepositoriesServiceTests extends ESTestCase {
         }
 
         @Override
-        public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                                  IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
-                                  Version repositoryMetaVersion, Map<String, Object> userMetadata,
-                                  ActionListener<ShardSnapshotResult> listener) {
+        public void snapshotShard(SnapshotShardContext context) {
 
         }
 

+ 8 - 4
server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java

@@ -40,12 +40,14 @@ import org.elasticsearch.common.util.MockBigArrays;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.ShardSnapshotResult;
 import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
 import org.elasticsearch.snapshots.Snapshot;
@@ -99,8 +101,9 @@ public class FsRepositoryTests extends ESTestCase {
             final PlainActionFuture<ShardSnapshotResult> future1 = PlainActionFuture.newFuture();
             runGeneric(threadPool, () -> {
                 IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
-                repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, null,
-                    snapshotStatus, Version.CURRENT, Collections.emptyMap(), future1);
+                repository.snapshotShard(new SnapshotShardContext(store, null, snapshotId, indexId,
+                    new Engine.IndexCommitRef(indexCommit, () -> {}), null, snapshotStatus, Version.CURRENT, Collections.emptyMap(),
+                    future1));
                 future1.actionGet();
                 IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
                 assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
@@ -128,8 +131,9 @@ public class FsRepositoryTests extends ESTestCase {
             final PlainActionFuture<ShardSnapshotResult> future2 = PlainActionFuture.newFuture();
             runGeneric(threadPool, () -> {
                 IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
-                repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit,
-                    null, snapshotStatus, Version.CURRENT, Collections.emptyMap(), future2);
+                repository.snapshotShard(new SnapshotShardContext(store, null, incSnapshotId, indexId,
+                    new Engine.IndexCommitRef(incIndexCommit, () -> {}), null, snapshotStatus, Version.CURRENT,
+                    Collections.emptyMap(), future2));
                 future2.actionGet();
                 IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
                 assertEquals(2, copy.getIncrementalFileCount());

+ 3 - 3
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -69,6 +69,7 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
 import org.elasticsearch.indices.recovery.StartRecoveryRequest;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.ShardSnapshotResult;
 import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
 import org.elasticsearch.snapshots.Snapshot;
@@ -837,9 +838,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
         final PlainActionFuture<ShardSnapshotResult> future = PlainActionFuture.newFuture();
         final String shardGen;
         try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
-            repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId,
-                indexCommitRef.getIndexCommit(), null, snapshotStatus, Version.CURRENT,
-                Collections.emptyMap(), future);
+            repository.snapshotShard(new SnapshotShardContext(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId,
+                indexCommitRef, null, snapshotStatus, Version.CURRENT, Collections.emptyMap(), future));
             shardGen = future.actionGet().getGeneration();
         }
 

+ 2 - 8
test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

@@ -7,7 +7,6 @@
  */
 package org.elasticsearch.index.shard;
 
-import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
@@ -17,22 +16,20 @@ import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
-import org.elasticsearch.index.store.Store;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.IndexMetaDataGenerations;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryShardId;
 import org.elasticsearch.repositories.ShardGenerations;
+import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.ShardSnapshotResult;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -133,10 +130,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
     }
 
     @Override
-    public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
-                              Version repositoryMetaVersion, Map<String, Object> userMetadata,
-                              ActionListener<ShardSnapshotResult> listener) {
+    public void snapshotShard(SnapshotShardContext context) {
     }
 
     @Override

+ 2 - 6
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -11,7 +11,6 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.ExceptionsHelper;
@@ -49,7 +48,6 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.engine.EngineException;
-import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
 import org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException;
 import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
@@ -70,6 +68,7 @@ import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryShardId;
 import org.elasticsearch.repositories.ShardGenerations;
+import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.ShardSnapshotResult;
 import org.elasticsearch.repositories.blobstore.FileRestoreContext;
 import org.elasticsearch.snapshots.SnapshotId;
@@ -314,10 +313,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
     }
 
     @Override
-    public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
-                              Version repositoryMetaVersion, Map<String, Object> userMetadata,
-                              ActionListener<ShardSnapshotResult> listener) {
+    public void snapshotShard(SnapshotShardContext context) {
         throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
     }
 

+ 11 - 11
x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java

@@ -32,11 +32,11 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.ShardLock;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.EngineConfig;
 import org.elasticsearch.index.engine.EngineFactory;
 import org.elasticsearch.index.engine.ReadOnlyEngine;
 import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.TranslogStats;
 import org.elasticsearch.repositories.FilterRepository;
@@ -44,7 +44,7 @@ import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.ShardGenerations;
-import org.elasticsearch.repositories.ShardSnapshotResult;
+import org.elasticsearch.repositories.SnapshotShardContext;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -55,7 +55,6 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -126,17 +125,16 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
 
 
     @Override
-    public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
-                              IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
-                              Version repositoryMetaVersion, Map<String, Object> userMetadata,
-                              ActionListener<ShardSnapshotResult> listener) {
+    public void snapshotShard(SnapshotShardContext context) {
+        final MapperService mapperService = context.mapperService();
         if (mapperService.documentMapper() != null // if there is no mapping this is null
             && mapperService.documentMapper().sourceMapper().isComplete() == false) {
-            listener.onFailure(
+            context.onFailure(
                 new IllegalStateException("Can't snapshot _source only on an index that has incomplete source ie. has _source disabled " +
                     "or filters the source"));
             return;
         }
+        final Store store = context.store();
         Directory unwrap = FilterDirectory.unwrap(store.directory());
         if (unwrap instanceof FSDirectory == false) {
             throw new AssertionError("expected FSDirectory but got " + unwrap.toString());
@@ -159,6 +157,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
             // SourceOnlySnapshot will take care of soft- and hard-deletes no special casing needed here
             SourceOnlySnapshot snapshot;
             snapshot = new SourceOnlySnapshot(overlayDir, querySupplier);
+            final IndexCommit snapshotIndexCommit = context.indexCommit();
             try {
                 snapshot.syncSnapshot(snapshotIndexCommit);
             } catch (NoSuchFileException | CorruptIndexException | FileAlreadyExistsException e) {
@@ -176,15 +175,16 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
             DirectoryReader reader = DirectoryReader.open(tempStore.directory());
             toClose.add(reader);
             IndexCommit indexCommit = reader.getIndexCommit();
-            super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, shardStateIdentifier, snapshotStatus,
-                repositoryMetaVersion, userMetadata, ActionListener.runBefore(listener, () -> IOUtils.close(toClose)));
+            super.snapshotShard(new SnapshotShardContext(tempStore, mapperService, context.snapshotId(), context.indexId(),
+                    new Engine.IndexCommitRef(indexCommit, () -> IOUtils.close(toClose)), context.stateIdentifier(),
+                    context.status(), context.getRepositoryMetaVersion(), context.userMetadata(), context));
         } catch (IOException e) {
             try {
                 IOUtils.close(toClose);
             } catch (IOException ex) {
                 e.addSuppressed(ex);
             }
-            listener.onFailure(e);
+            context.onFailure(e);
         }
     }
 

+ 15 - 14
x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

@@ -65,6 +65,7 @@ import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.ShardGenerations;
+import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.ShardSnapshotResult;
 import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
 import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
@@ -108,9 +109,9 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
         try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1");
             final PlainActionFuture<ShardSnapshotResult> future = PlainActionFuture.newFuture();
-            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
-                snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT,
-                Collections.emptyMap(), future));
+            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(new SnapshotShardContext(shard.store(),
+                shard.mapperService(), snapshotId, indexId, snapshotRef, null, indexShardSnapshotStatus, Version.CURRENT,
+                Collections.emptyMap(), future)));
             IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet);
             assertEquals(
                 "Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source",
@@ -135,9 +136,9 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
             SnapshotId snapshotId = new SnapshotId("test", "test");
             final PlainActionFuture<ShardSnapshotResult> future = PlainActionFuture.newFuture();
-            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
-                snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT,
-                Collections.emptyMap(), future));
+            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(new SnapshotShardContext(shard.store(),
+                shard.mapperService(), snapshotId, indexId, snapshotRef, null, indexShardSnapshotStatus, Version.CURRENT,
+                Collections.emptyMap(), future)));
             shardGeneration = future.actionGet().getGeneration();
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
@@ -152,9 +153,9 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
 
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
             final PlainActionFuture<ShardSnapshotResult> future = PlainActionFuture.newFuture();
-            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
-                snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT,
-                Collections.emptyMap(), future));
+            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(new SnapshotShardContext(shard.store(),
+                shard.mapperService(), snapshotId, indexId, snapshotRef, null, indexShardSnapshotStatus, Version.CURRENT,
+                Collections.emptyMap(), future)));
             shardGeneration = future.actionGet().getGeneration();
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             // we processed the segments_N file plus _1.si, _1.fnm, _1.fdx, _1.fdt, _1.fdm
@@ -169,9 +170,9 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
 
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
             final PlainActionFuture<ShardSnapshotResult> future = PlainActionFuture.newFuture();
-            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
-                snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT,
-                Collections.emptyMap(), future));
+            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(new SnapshotShardContext(shard.store(),
+                shard.mapperService(), snapshotId, indexId, snapshotRef, null, indexShardSnapshotStatus, Version.CURRENT,
+                Collections.emptyMap(), future)));
             future.actionGet();
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             // we processed the segments_N file plus _1_1.liv
@@ -218,8 +219,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
             final PlainActionFuture<ShardSnapshotResult> future = PlainActionFuture.newFuture();
             runAsSnapshot(shard.getThreadPool(), () -> {
-                repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(),
-                    null, indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future);
+                repository.snapshotShard(new SnapshotShardContext(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef,
+                    null, indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future));
                 future.actionGet();
                 final PlainActionFuture<RepositoryData> finFuture = PlainActionFuture.newFuture();
                 final ShardGenerations shardGenerations =

+ 5 - 33
x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java

@@ -9,7 +9,6 @@ package org.elasticsearch.repositories.encrypted;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
@@ -36,20 +35,15 @@ import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
 import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
-import org.elasticsearch.index.store.Store;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.XPackLicenseState;
-import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.repositories.RepositoryStats;
 import org.elasticsearch.repositories.ShardGenerations;
-import org.elasticsearch.repositories.ShardSnapshotResult;
+import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
-import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
 
 import javax.crypto.KeyGenerator;
@@ -240,36 +234,14 @@ public class EncryptedRepository extends BlobStoreRepository {
     }
 
     @Override
-    public void snapshotShard(
-        Store store,
-        MapperService mapperService,
-        SnapshotId snapshotId,
-        IndexId indexId,
-        IndexCommit snapshotIndexCommit,
-        String shardStateIdentifier,
-        IndexShardSnapshotStatus snapshotStatus,
-        Version repositoryMetaVersion,
-        Map<String, Object> userMetadata,
-        ActionListener<ShardSnapshotResult> listener
-    ) {
+    public void snapshotShard(SnapshotShardContext context) {
         try {
-            validateLocalRepositorySecret(userMetadata);
+            validateLocalRepositorySecret(context.userMetadata());
         } catch (RepositoryException passwordValidationException) {
-            listener.onFailure(passwordValidationException);
+            context.onFailure(passwordValidationException);
             return;
         }
-        super.snapshotShard(
-            store,
-            mapperService,
-            snapshotId,
-            indexId,
-            snapshotIndexCommit,
-            shardStateIdentifier,
-            snapshotStatus,
-            repositoryMetaVersion,
-            userMetadata,
-            listener
-        );
+        super.snapshotShard(context);
     }
 
     @Override

+ 4 - 35
x-pack/plugin/repository-encrypted/src/test/java/org/elasticsearch/repositories/encrypted/LocalStateEncryptedRepositoryPlugin.java

@@ -6,29 +6,20 @@
  */
 package org.elasticsearch.repositories.encrypted;
 
-import org.apache.lucene.index.IndexCommit;
-import org.elasticsearch.Version;
-import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
-import org.elasticsearch.index.store.Store;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.license.XPackLicenseState;
-import org.elasticsearch.repositories.IndexId;
-import org.elasticsearch.repositories.ShardSnapshotResult;
+import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
-import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
 
 import java.nio.file.Path;
 import java.security.GeneralSecurityException;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -95,37 +86,15 @@ public final class LocalStateEncryptedRepositoryPlugin extends LocalStateComposi
         }
 
         @Override
-        public void snapshotShard(
-            Store store,
-            MapperService mapperService,
-            SnapshotId snapshotId,
-            IndexId indexId,
-            IndexCommit snapshotIndexCommit,
-            String shardStateIdentifier,
-            IndexShardSnapshotStatus snapshotStatus,
-            Version repositoryMetaVersion,
-            Map<String, Object> userMetadata,
-            ActionListener<ShardSnapshotResult> listener
-        ) {
+        public void snapshotShard(SnapshotShardContext context) {
             snapshotShardLock.lock();
             try {
                 while (snapshotShardBlock.get()) {
                     snapshotShardCondition.await();
                 }
-                super.snapshotShard(
-                    store,
-                    mapperService,
-                    snapshotId,
-                    indexId,
-                    snapshotIndexCommit,
-                    shardStateIdentifier,
-                    snapshotStatus,
-                    repositoryMetaVersion,
-                    userMetadata,
-                    listener
-                );
+                super.snapshotShard(context);
             } catch (InterruptedException e) {
-                listener.onFailure(e);
+                context.onFailure(e);
             } finally {
                 snapshotShardLock.unlock();
             }

+ 14 - 10
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java

@@ -68,6 +68,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardPath;
@@ -75,6 +76,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.store.StoreFileMetadata;
+import org.elasticsearch.repositories.SnapshotShardContext;
 import org.elasticsearch.repositories.ShardSnapshotResult;
 import org.elasticsearch.xpack.searchablesnapshots.cache.common.TestUtils;
 import org.elasticsearch.xpack.searchablesnapshots.store.input.ChecksumBlobContainerIndexInput;
@@ -607,16 +609,18 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot
                 threadPool.generic().submit(() -> {
                     IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
                     repository.snapshotShard(
-                        store,
-                        null,
-                        snapshotId,
-                        indexId,
-                        indexCommit,
-                        null,
-                        snapshotStatus,
-                        Version.CURRENT,
-                        emptyMap(),
-                        future
+                        new SnapshotShardContext(
+                            store,
+                            null,
+                            snapshotId,
+                            indexId,
+                            new Engine.IndexCommitRef(indexCommit, () -> {}),
+                            null,
+                            snapshotStatus,
+                            Version.CURRENT,
+                            emptyMap(),
+                            future
+                        )
                     );
                     future.actionGet();
                 });