瀏覽代碼

Remove IndexShard dependency from Repository (#42213)

* Remove IndexShard dependency from Repository

In order to simplify repository testing especially for BlobStoreRepository
it's important to remove the dependency on IndexShard and reduce it to
Store and MapperService (in the snapshot case). This significantly reduces
the dependcy footprint for Repository and allows unittesting without starting
nodes or instantiate entire shard instances. This change deprecates the old
method signatures and adds a unittest for FileRepository to show the advantage
of this change.
In addition, the unittesting surfaced a bug where the internal file names that
are private to the repository were used in the recovery stats instead of the
target file names which makes it impossible to relate to the actual lucene files
in the recovery stats.

* don't delegate deprecated methods

* apply comments

* test
Simon Willnauer 6 年之前
父節點
當前提交
d22844208b
共有 16 個文件被更改,包括 353 次插入107 次删除
  1. 2 1
      server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
  2. 8 7
      server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
  3. 46 6
      server/src/main/java/org/elasticsearch/repositories/Repository.java
  4. 13 12
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
  5. 9 14
      server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java
  6. 1 2
      server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
  7. 2 2
      server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  8. 5 5
      server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
  9. 201 0
      server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java
  10. 8 5
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  11. 3 2
      test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
  12. 28 29
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
  13. 2 2
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java
  14. 2 2
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java
  15. 14 10
      x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java
  16. 9 8
      x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

+ 2 - 1
server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

@@ -468,7 +468,8 @@ final class StoreRecovery {
                 snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
             }
             final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
-            repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(),
+            assert indexShard.getEngineOrNull() == null;
+            repository.restoreShard(indexShard, indexShard.store(), restoreSource.snapshot().getSnapshotId(),
                 restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
             final Store store = indexShard.store();
             store.bootstrapNewHistory();

+ 8 - 7
server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

@@ -27,7 +27,7 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.component.LifecycleListener;
-import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.index.store.Store;
@@ -119,16 +119,17 @@ public class FilterRepository implements Repository {
         return in.isReadOnly();
     }
 
+
     @Override
-    public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
-                              IndexShardSnapshotStatus snapshotStatus) {
-        in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
+    public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
+        in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
     }
 
     @Override
-    public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
-                             RecoveryState recoveryState) {
-        in.restoreShard(shard, snapshotId, version, indexId, snapshotShardId, recoveryState);
+    public void restoreShard(Store store, SnapshotId snapshotId,
+                             Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
+        in.restoreShard(store, snapshotId, version, indexId, snapshotShardId, recoveryState);
     }
 
     @Override

+ 46 - 6
server/src/main/java/org/elasticsearch/repositories/Repository.java

@@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.component.LifecycleComponent;
+import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@@ -49,7 +50,7 @@ import java.util.function.Function;
  * <ul>
  * <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
  * with list of indices that will be included into the snapshot</li>
- * <li>Data nodes call {@link Repository#snapshotShard(IndexShard, Store, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
+ * <li>Data nodes call {@link Repository#snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
  * for each shard</li>
  * <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
  * </ul>
@@ -196,30 +197,69 @@ public interface Repository extends LifecycleComponent {
      * <p>
      * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
      * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
-     * @param shard               shard to be snapshotted
+     * @param indexShard          the shard to be snapshotted
+     * @param snapshotId          snapshot id
+     * @param indexId             id for the index being snapshotted
+     * @param snapshotIndexCommit commit point
+     * @param snapshotStatus      snapshot status
+     * @deprecated use {@link #snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)} instead
+     */
+    @Deprecated
+    default void snapshotShard(IndexShard indexShard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
+                       IndexShardSnapshotStatus snapshotStatus) {
+        snapshotShard(indexShard.store(), indexShard.mapperService(), snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
+    }
+
+    /**
+     * Creates a snapshot of the shard based on the index commit point.
+     * <p>
+     * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
+     * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
+     * <p>
+     * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
+     * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
      * @param store               store to be snapshotted
+     * @param mapperService       the shards mapper service
      * @param snapshotId          snapshot id
      * @param indexId             id for the index being snapshotted
      * @param snapshotIndexCommit commit point
      * @param snapshotStatus      snapshot status
      */
-    void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
+    void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
                        IndexShardSnapshotStatus snapshotStatus);
 
     /**
      * Restores snapshot of the shard.
      * <p>
      * The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied.
-     *
      * @param shard           the shard to restore the index into
+     * @param store           the store to restore the index into
+     * @param snapshotId      snapshot id
+     * @param version         version of elasticsearch that created this snapshot
+     * @param indexId         id of the index in the repository from which the restore is occurring
+     * @param snapshotShardId shard id (in the snapshot)
+     * @param recoveryState   recovery state
+     * @deprecated use {@link #restoreShard(Store, SnapshotId, Version, IndexId, ShardId, RecoveryState)} instead
+     */
+    @Deprecated
+    default void restoreShard(IndexShard shard, Store store, SnapshotId snapshotId, Version version, IndexId indexId,
+                              ShardId snapshotShardId, RecoveryState recoveryState) {
+        restoreShard(store, snapshotId, version, indexId, snapshotShardId, recoveryState);
+    }
+
+    /**
+     * Restores snapshot of the shard.
+     * <p>
+     * The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied.
+     * @param store           the store to restore the index into
      * @param snapshotId      snapshot id
      * @param version         version of elasticsearch that created this snapshot
      * @param indexId         id of the index in the repository from which the restore is occurring
      * @param snapshotShardId shard id (in the snapshot)
      * @param recoveryState   recovery state
      */
-    void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId,
-                      ShardId snapshotShardId, RecoveryState recoveryState);
+    void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
+                      RecoveryState recoveryState);
 
     /**
      * Retrieve shard snapshot status for the stored snapshot

+ 13 - 12
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -71,7 +71,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.core.internal.io.Streams;
-import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
@@ -793,8 +793,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     }
 
     @Override
-    public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
-                              IndexShardSnapshotStatus snapshotStatus) {
+    public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
         SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis());
         try {
             snapshotContext.snapshot(snapshotIndexCommit);
@@ -809,18 +809,19 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     }
 
     @Override
-    public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
-                             RecoveryState recoveryState) {
-        final Context context = new Context(snapshotId, indexId, shard.shardId(), snapshotShardId);
+    public void restoreShard(Store store, SnapshotId snapshotId,
+                             Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
+        ShardId shardId = store.shardId();
+        final Context context = new Context(snapshotId, indexId, shardId, snapshotShardId);
         BlobPath path = basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId()));
         BlobContainer blobContainer = blobStore().blobContainer(path);
-        final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, recoveryState, blobContainer);
+        final RestoreContext snapshotContext = new RestoreContext(shardId, snapshotId, recoveryState, blobContainer);
         try {
             BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
             SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
-            snapshotContext.restore(snapshotFiles);
+            snapshotContext.restore(snapshotFiles, store);
         } catch (Exception e) {
-            throw new IndexShardRestoreFailedException(shard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
+            throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
         }
     }
 
@@ -1366,13 +1367,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
         /**
          * Constructs new restore context
-         * @param indexShard    shard to restore into
+         * @param shardId    shard id to restore into
          * @param snapshotId    snapshot id
          * @param recoveryState recovery state to report progress
          * @param blobContainer the blob container to read the files from
          */
-        RestoreContext(IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) {
-            super(metadata.name(), indexShard, snapshotId, recoveryState, BUFFER_SIZE);
+        RestoreContext(ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) {
+            super(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE);
             this.blobContainer = blobContainer;
         }
 

+ 9 - 14
server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java

@@ -31,7 +31,6 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefBuilder;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.util.iterable.Iterables;
-import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
 import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
@@ -64,7 +63,6 @@ public abstract class FileRestoreContext {
     protected static final Logger logger = LogManager.getLogger(FileRestoreContext.class);
 
     protected final String repositoryName;
-    protected final IndexShard indexShard;
     protected final RecoveryState recoveryState;
     protected final SnapshotId snapshotId;
     protected final ShardId shardId;
@@ -73,26 +71,24 @@ public abstract class FileRestoreContext {
     /**
      * Constructs new restore context
      *
-     * @param indexShard    shard to restore into
+     * @param shardId       shard id to restore into
      * @param snapshotId    snapshot id
      * @param recoveryState recovery state to report progress
      * @param bufferSize    buffer size for restore
      */
-    protected FileRestoreContext(String repositoryName, IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState,
+    protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState,
                                  int bufferSize) {
         this.repositoryName = repositoryName;
         this.recoveryState = recoveryState;
-        this.indexShard = indexShard;
         this.snapshotId = snapshotId;
-        this.shardId = indexShard.shardId();
+        this.shardId = shardId;
         this.bufferSize = bufferSize;
     }
 
     /**
      * Performs restore operation
      */
-    public void restore(SnapshotFiles snapshotFiles) throws IOException {
-        final Store store = indexShard.store();
+    public void restore(SnapshotFiles snapshotFiles, Store store) throws IOException {
         store.incRef();
         try {
             logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
@@ -108,7 +104,7 @@ public abstract class FileRestoreContext {
                 // version number and no checksum, even though the index itself is perfectly fine to restore, this
                 // empty shard would cause exceptions to be thrown.  Since there is no data to restore from an empty
                 // shard anyway, we just create the empty shard here and then exit.
-                store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion);
+                store.createEmpty(store.indexSettings().getIndexVersionCreated().luceneVersion);
                 return;
             }
 
@@ -117,7 +113,7 @@ public abstract class FileRestoreContext {
                 // this will throw an IOException if the store has no segments infos file. The
                 // store can still have existing files but they will be deleted just before being
                 // restored.
-                recoveryTargetMetadata = indexShard.snapshotStoreMetadata();
+                recoveryTargetMetadata = store.getMetadata(null, true);
             } catch (org.apache.lucene.index.IndexNotFoundException e) {
                 // happens when restore to an empty shard, not a big deal
                 logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId);
@@ -127,7 +123,6 @@ public abstract class FileRestoreContext {
                     shardId, snapshotId), e);
                 recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
             }
-
             final List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover = new ArrayList<>();
             final Map<String, StoreFileMetaData> snapshotMetaData = new HashMap<>();
             final Map<String, BlobStoreIndexShardSnapshot.FileInfo> fileInfos = new HashMap<>();
@@ -157,7 +152,7 @@ public abstract class FileRestoreContext {
             final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
             for (StoreFileMetaData md : diff.identical) {
                 BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
-                recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true);
+                recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), true);
                 if (logger.isTraceEnabled()) {
                     logger.trace("[{}] [{}] not_recovering file [{}] from [{}], exists in local store and is same", shardId, snapshotId,
                         fileInfo.physicalName(), fileInfo.name());
@@ -167,7 +162,7 @@ public abstract class FileRestoreContext {
             for (StoreFileMetaData md : concat(diff)) {
                 BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
                 filesToRecover.add(fileInfo);
-                recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false);
+                recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), false);
                 if (logger.isTraceEnabled()) {
                     logger.trace("[{}] [{}] recovering [{}] from [{}]", shardId, snapshotId,
                         fileInfo.physicalName(), fileInfo.name());
@@ -260,7 +255,7 @@ public abstract class FileRestoreContext {
                 int length;
                 while ((length = stream.read(buffer)) > 0) {
                     indexOutput.writeBytes(buffer, 0, length);
-                    recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
+                    recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
                 }
                 Store.verify(indexOutput);
                 indexOutput.close();

+ 1 - 2
server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

@@ -367,8 +367,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
         try {
             // we flush first to make sure we get the latest writes snapshotted
             try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) {
-                repository.snapshotShard(indexShard, indexShard.store(), snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(),
-                    snapshotStatus);
+                repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
                 if (logger.isDebugEnabled()) {
                     final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
                     logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus);

+ 2 - 2
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -2300,8 +2300,8 @@ public class IndexShardTests extends IndexShardTestCase {
         target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
         assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") {
             @Override
-            public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
-                                     RecoveryState recoveryState) {
+            public void restoreShard(Store store, SnapshotId snapshotId,
+                                     Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
                 try {
                     cleanLuceneIndex(targetStore.directory());
                     for (String file : sourceStore.directory().listAll()) {

+ 5 - 5
server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

@@ -33,7 +33,7 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.component.LifecycleListener;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.index.store.Store;
@@ -200,14 +200,14 @@ public class RepositoriesServiceTests extends ESTestCase {
         }
 
         @Override
-        public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
-                                  IndexShardSnapshotStatus snapshotStatus) {
+        public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit
+            snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
 
         }
 
         @Override
-        public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
-                                 RecoveryState recoveryState) {
+        public void restoreShard(Store store, SnapshotId snapshotId,
+                                 Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
 
         }
 

+ 201 - 0
server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java

@@ -0,0 +1,201 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.repositories.fs;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.CodecReader;
+import org.apache.lucene.index.FilterMergePolicy;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.NoMergePolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.IOSupplier;
+import org.apache.lucene.util.TestUtil;
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.RepositoryMetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardRoutingHelper;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.common.lucene.Lucene;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.indices.recovery.RecoveryState;
+import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.snapshots.Snapshot;
+import org.elasticsearch.snapshots.SnapshotId;
+import org.elasticsearch.test.DummyShardLock;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.IndexSettingsModule;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+
+public class FsRepositoryTests extends ESTestCase {
+
+    public void testSnapshotAndRestore() throws IOException, InterruptedException {
+        ThreadPool threadPool = new TestThreadPool(getClass().getSimpleName());
+        try (Directory directory = newDirectory()) {
+            Path repo = createTempDir();
+            Settings settings = Settings.builder()
+                .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
+                .put(Environment.PATH_REPO_SETTING.getKey(), repo.toAbsolutePath())
+                .putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths())
+                .put("location", repo)
+                .put("compress", randomBoolean())
+                .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES).build();
+
+            int numDocs = indexDocs(directory);
+            RepositoryMetaData metaData = new RepositoryMetaData("test", "fs", settings);
+            FsRepository repository = new FsRepository(metaData, new Environment(settings, null), NamedXContentRegistry.EMPTY, threadPool);
+            repository.start();
+            final Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_INDEX_UUID, "myindexUUID").build();
+            IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("myindex", indexSettings);
+            ShardId shardId = new ShardId(idxSettings.getIndex(), 1);
+            Store store = new Store(shardId, idxSettings, directory, new DummyShardLock(shardId));
+            SnapshotId snapshotId = new SnapshotId("test", "test");
+            IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID());
+
+            IndexCommit indexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory());
+            runGeneric(threadPool, () -> {
+                IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
+                repository.snapshotShard(store, null, snapshotId, indexId, indexCommit,
+                    snapshotStatus);
+                IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
+                assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
+            });
+            Lucene.cleanLuceneIndex(directory);
+            expectThrows(org.apache.lucene.index.IndexNotFoundException.class, () -> Lucene.readSegmentInfos(directory));
+            DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
+            ShardRouting routing = ShardRouting.newUnassigned(shardId, true, new RecoverySource.SnapshotRecoverySource("test",
+                    new Snapshot("foo", snapshotId), Version.CURRENT, "myindex"),
+                new UnassignedInfo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED, ""));
+            routing = ShardRoutingHelper.initialize(routing, localNode.getId(), 0);
+            RecoveryState state = new RecoveryState(routing, localNode, null);
+            runGeneric(threadPool, () ->
+                repository.restoreShard(store, snapshotId, Version.CURRENT, indexId, shardId, state));
+            assertTrue(state.getIndex().recoveredBytes() > 0);
+            assertEquals(0, state.getIndex().reusedFileCount());
+            assertEquals(indexCommit.getFileNames().size(), state.getIndex().recoveredFileCount());
+            assertEquals(numDocs, Lucene.readSegmentInfos(directory).totalMaxDoc());
+            deleteRandomDoc(store.directory());
+            SnapshotId incSnapshotId = new SnapshotId("test1", "test1");
+            IndexCommit incIndexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory());
+            Collection<String> commitFileNames = incIndexCommit.getFileNames();
+            runGeneric(threadPool, () -> {
+                IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
+                repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus);
+                IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
+                assertEquals(2, copy.getIncrementalFileCount());
+                assertEquals(commitFileNames.size(), copy.getTotalFileCount());
+            });
+
+            // roll back to the first snap and then incrementally restore
+            RecoveryState firstState = new RecoveryState(routing, localNode, null);
+            runGeneric(threadPool, () ->
+                repository.restoreShard(store, snapshotId, Version.CURRENT, indexId, shardId, firstState));
+            assertEquals("should reuse everything except of .liv and .si",
+                commitFileNames.size()-2, firstState.getIndex().reusedFileCount());
+
+            RecoveryState secondState = new RecoveryState(routing, localNode, null);
+            runGeneric(threadPool, () ->
+                repository.restoreShard(store, incSnapshotId, Version.CURRENT, indexId, shardId, secondState));
+            assertEquals(secondState.getIndex().reusedFileCount(), commitFileNames.size()-2);
+            assertEquals(secondState.getIndex().recoveredFileCount(), 2);
+            List<RecoveryState.File> recoveredFiles =
+                secondState.getIndex().fileDetails().stream().filter(f -> f.reused() == false).collect(Collectors.toList());
+            Collections.sort(recoveredFiles, Comparator.comparing(RecoveryState.File::name));
+            assertTrue(recoveredFiles.get(0).name(), recoveredFiles.get(0).name().endsWith(".liv"));
+            assertTrue(recoveredFiles.get(1).name(), recoveredFiles.get(1).name().endsWith("segments_2"));
+        } finally {
+            terminate(threadPool);
+        }
+    }
+
+    private void runGeneric(ThreadPool threadPool, Runnable runnable) throws InterruptedException {
+        CountDownLatch latch = new CountDownLatch(1);
+        threadPool.generic().submit(() -> {
+            try {
+                runnable.run();
+            } finally {
+                latch.countDown();
+            }
+        });
+        latch.await();
+    }
+
+    private void deleteRandomDoc(Directory directory) throws IOException {
+        try(IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig(random(),
+            new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec()).setMergePolicy(new FilterMergePolicy(NoMergePolicy.INSTANCE) {
+            @Override
+            public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier) {
+                return true;
+            }
+
+        }))) {
+            final int numDocs = writer.getDocStats().numDocs;
+            writer.deleteDocuments(new Term("id", "" + randomIntBetween(0, writer.getDocStats().numDocs-1)));
+            writer.commit();
+            assertEquals(writer.getDocStats().numDocs, numDocs-1);
+        }
+    }
+
+    private int indexDocs(Directory directory) throws IOException {
+        try(IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig(random(),
+            new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec()))) {
+            int docs = 1 + random().nextInt(100);
+            for (int i = 0; i < docs; i++) {
+                Document doc = new Document();
+                doc.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
+                doc.add(new TextField("body",
+                    TestUtil.randomRealisticUnicodeString(random()), random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
+                doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random()))));
+                writer.addDocument(doc);
+            }
+            writer.commit();
+            return docs;
+        }
+    }
+}

+ 8 - 5
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -797,7 +797,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
     /** Recover a shard from a snapshot using a given repository **/
     protected void recoverShardFromSnapshot(final IndexShard shard,
                                             final Snapshot snapshot,
-                                            final Repository repository) throws IOException {
+                                            final Repository repository) {
         final Version version = Version.CURRENT;
         final ShardId shardId = shard.shardId();
         final String index = shardId.getIndexName();
@@ -806,9 +806,12 @@ public abstract class IndexShardTestCase extends ESTestCase {
         final RecoverySource.SnapshotRecoverySource recoverySource =
             new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, version, index);
         final ShardRouting shardRouting = newShardRouting(shardId, node.getId(), true, ShardRoutingState.INITIALIZING, recoverySource);
-
         shard.markAsRecovering("from snapshot", new RecoveryState(shardRouting, node, null));
-        repository.restoreShard(shard, snapshot.getSnapshotId(), version, indexId, shard.shardId(), shard.recoveryState());
+        repository.restoreShard(shard.store(),
+            snapshot.getSnapshotId(), version,
+            indexId,
+            shard.shardId(),
+            shard.recoveryState());
     }
 
     /** Snapshot a shard using a given repository **/
@@ -820,8 +823,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
             Index index = shard.shardId().getIndex();
             IndexId indexId = new IndexId(index.getName(), index.getUUID());
 
-            repository.snapshotShard(shard, shard.store(), snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(),
-                snapshotStatus);
+            repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId,
+                indexCommitRef.getIndexCommit(), snapshotStatus);
         }
 
         final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();

+ 3 - 2
test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

@@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.repositories.IndexId;
@@ -133,8 +134,8 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
     }
 
     @Override
-    public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
-                              IndexShardSnapshotStatus snapshotStatus) {
+    public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
     }
 
     @Override

+ 28 - 29
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -42,10 +42,10 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.engine.EngineException;
+import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
 import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
 import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
-import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardRecoveryException;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
@@ -294,18 +294,19 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
     }
 
     @Override
-    public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
-                              IndexShardSnapshotStatus snapshotStatus) {
+    public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
         throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
     }
 
     @Override
-    public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId,
-                             RecoveryState recoveryState) {
+    public void restoreShard(Store store, SnapshotId snapshotId,
+                             Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
         // TODO: Add timeouts to network calls / the restore process.
-        createEmptyStore(indexShard, shardId);
+        createEmptyStore(store);
+        ShardId shardId = store.shardId();
 
-        final Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
+        final Map<String, String> ccrMetaData = store.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
         final String leaderIndexName = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
         final String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
         final Index leaderIndex = new Index(leaderIndexName, leaderUUID);
@@ -314,14 +315,14 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         final Client remoteClient = getRemoteClusterClient();
 
         final String retentionLeaseId =
-                retentionLeaseId(localClusterName, indexShard.shardId().getIndex(), remoteClusterAlias, leaderIndex);
+                retentionLeaseId(localClusterName, shardId.getIndex(), remoteClusterAlias, leaderIndex);
 
         acquireRetentionLeaseOnLeader(shardId, retentionLeaseId, leaderShardId, remoteClient);
 
         // schedule renewals to run during the restore
         final Scheduler.Cancellable renewable = threadPool.scheduleWithFixedDelay(
                 () -> {
-                    logger.trace("{} background renewal of retention lease [{}] during restore", indexShard.shardId(), retentionLeaseId);
+                    logger.trace("{} background renewal of retention lease [{}] during restore", shardId, retentionLeaseId);
                     final ThreadContext threadContext = threadPool.getThreadContext();
                     try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
                         // we have to execute under the system context so that if security is enabled the renewal is authorized
@@ -336,36 +337,34 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                                         e -> {
                                             assert e instanceof ElasticsearchSecurityException == false : e;
                                             logger.warn(new ParameterizedMessage(
-                                                            "{} background renewal of retention lease [{}] failed during restore",
-                                                            indexShard.shardId(),
-                                                            retentionLeaseId),
-                                                    e);
+                                                            "{} background renewal of retention lease [{}] failed during restore", shardId,
+                                                    retentionLeaseId), e);
                                         }));
                     }
                 },
-                CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getNodeSettings()),
+                CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(store.indexSettings().getNodeSettings()),
                 Ccr.CCR_THREAD_POOL_NAME);
 
         // TODO: There should be some local timeout. And if the remote cluster returns an unknown session
         //  response, we should be able to retry by creating a new session.
-        try (RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, indexShard, recoveryState)) {
-            restoreSession.restoreFiles();
-            updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, indexShard.routingEntry().index());
+        try (RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState)) {
+            restoreSession.restoreFiles(store);
+            updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, shardId.getIndex());
         } catch (Exception e) {
-            throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
+            throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
         } finally {
-            logger.trace("{} canceling background renewal of retention lease [{}] at the end of restore", shardId, retentionLeaseId);
+            logger.trace("{} canceling background renewal of retention lease [{}] at the end of restore", shardId,
+                retentionLeaseId);
             renewable.cancel();
         }
     }
 
-    private void createEmptyStore(final IndexShard indexShard, final ShardId shardId) {
-        final Store store = indexShard.store();
+    private void createEmptyStore(Store store) {
         store.incRef();
         try {
-            store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion);
+            store.createEmpty(store.indexSettings().getIndexVersionCreated().luceneVersion);
         } catch (final EngineException | IOException e) {
-            throw new IndexShardRecoveryException(shardId, "failed to create empty store", e);
+            throw new IndexShardRecoveryException(store.shardId(), "failed to create empty store", e);
         } finally {
             store.decRef();
         }
@@ -432,12 +431,12 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         }
     }
 
-    RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
+    RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, ShardId indexShardId,
                                        RecoveryState recoveryState) {
         String sessionUUID = UUIDs.randomBase64UUID();
         PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
             new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(ccrSettings.getRecoveryActionTimeout());
-        return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
+        return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShardId, recoveryState,
             response.getStoreFileMetaData(), response.getMappingVersion(), threadPool, ccrSettings, throttledTime::inc);
     }
 
@@ -452,10 +451,10 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         private final LongConsumer throttleListener;
         private final ThreadPool threadPool;
 
-        RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
+        RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, ShardId shardId,
                        RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion,
                        ThreadPool threadPool, CcrSettings ccrSettings, LongConsumer throttleListener) {
-            super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes()));
+            super(repositoryName, shardId, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes()));
             this.remoteClient = remoteClient;
             this.sessionUUID = sessionUUID;
             this.node = node;
@@ -466,14 +465,14 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
             this.throttleListener = throttleListener;
         }
 
-        void restoreFiles() throws IOException {
+        void restoreFiles(Store store) throws IOException {
             ArrayList<FileInfo> fileInfos = new ArrayList<>();
             for (StoreFileMetaData fileMetaData : sourceMetaData) {
                 ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length());
                 fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize));
             }
             SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos);
-            restore(snapshotFiles);
+            restore(snapshotFiles, store);
         }
 
         @Override

+ 2 - 2
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

@@ -447,8 +447,8 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
                 primary.markAsRecovering("remote recovery from leader", new RecoveryState(routing, localNode, null));
                 primary.restoreFromRepository(new RestoreOnlyRepository(index.getName()) {
                     @Override
-                    public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version,
-                                             IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
+                    public void restoreShard(Store store, SnapshotId snapshotId,
+                                             Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
                         try {
                             IndexShard leader = leaderGroup.getPrimary();
                             Lucene.cleanLuceneIndex(primary.store().directory());

+ 2 - 2
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java

@@ -127,8 +127,8 @@ public class FollowEngineIndexShardTests extends IndexShardTestCase {
         target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
         assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") {
             @Override
-            public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
-                                     RecoveryState recoveryState) {
+            public void restoreShard(Store store, SnapshotId snapshotId,
+                                     Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
                 try {
                     cleanLuceneIndex(targetStore.directory());
                     for (String file : sourceStore.directory().listAll()) {

+ 14 - 10
x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java

@@ -10,7 +10,9 @@ import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.FilterDirectory;
 import org.apache.lucene.store.SimpleFSDirectory;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
@@ -24,8 +26,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.env.ShardLock;
 import org.elasticsearch.index.engine.EngineFactory;
 import org.elasticsearch.index.engine.ReadOnlyEngine;
-import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.index.shard.ShardPath;
+import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.TranslogStats;
@@ -104,15 +105,18 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
     }
 
     @Override
-    public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
-                              IndexShardSnapshotStatus snapshotStatus) {
-        if (shard.mapperService().documentMapper() != null // if there is no mapping this is null
-            && shard.mapperService().documentMapper().sourceMapper().isComplete() == false) {
+    public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
+                              IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
+        if (mapperService.documentMapper() != null // if there is no mapping this is null
+            && mapperService.documentMapper().sourceMapper().isComplete() == false) {
             throw new IllegalStateException("Can't snapshot _source only on an index that has incomplete source ie. has _source disabled " +
                 "or filters the source");
         }
-        ShardPath shardPath = shard.shardPath();
-        Path dataPath = shardPath.getDataPath();
+        Directory unwrap = FilterDirectory.unwrap(store.directory());
+        if (unwrap instanceof FSDirectory == false) {
+            throw new AssertionError("expected FSDirectory but got " + unwrap.toString());
+        }
+        Path dataPath = ((FSDirectory) unwrap).getDirectory().getParent();
         // TODO should we have a snapshot tmp directory per shard that is maintained by the system?
         Path snapPath = dataPath.resolve(SNAPSHOT_DIR_NAME);
         try (FSDirectory directory = new SimpleFSDirectory(snapPath)) {
@@ -122,7 +126,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
                     // do nothing;
                 }
             }, Store.OnClose.EMPTY);
-            Supplier<Query> querySupplier = shard.mapperService().hasNested() ? Queries::newNestedFilter : null;
+            Supplier<Query> querySupplier = mapperService.hasNested() ? Queries::newNestedFilter : null;
             // SourceOnlySnapshot will take care of soft- and hard-deletes no special casing needed here
             SourceOnlySnapshot snapshot = new SourceOnlySnapshot(tempStore.directory(), querySupplier);
             snapshot.syncSnapshot(snapshotIndexCommit);
@@ -133,7 +137,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
             store.incRef();
             try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) {
                 IndexCommit indexCommit = reader.getIndexCommit();
-                super.snapshotShard(shard, tempStore, snapshotId, indexId, indexCommit, snapshotStatus);
+                super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus);
             } finally {
                 store.decRef();
             }

+ 9 - 8
x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

@@ -98,7 +98,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
             IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () ->
                 runAsSnapshot(shard.getThreadPool(),
-                    () -> repository.snapshotShard(shard, shard.store(), snapshotId, indexId,
+                    () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
                         snapshotRef.getIndexCommit(), indexShardSnapshotStatus)));
             assertEquals("Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source"
                 , illegalStateException.getMessage());
@@ -120,8 +120,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
         try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
             SnapshotId snapshotId = new SnapshotId("test", "test");
-            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard, shard.store(), snapshotId, indexId, snapshotRef
-                    .getIndexCommit(), indexShardSnapshotStatus));
+            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
+                snapshotRef.getIndexCommit(), indexShardSnapshotStatus));
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
             totalFileCount = copy.getTotalFileCount();
@@ -134,8 +134,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
             SnapshotId snapshotId = new SnapshotId("test_1", "test_1");
 
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
-            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard, shard.store(), snapshotId, indexId, snapshotRef
-                .getIndexCommit(), indexShardSnapshotStatus));
+            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
+                snapshotRef.getIndexCommit(), indexShardSnapshotStatus));
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             // we processed the segments_N file plus _1.si, _1.fdx, _1.fnm, _1.fdt
             assertEquals(5, copy.getIncrementalFileCount());
@@ -148,8 +148,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
             SnapshotId snapshotId = new SnapshotId("test_2", "test_2");
 
             IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
-            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard, shard.store(), snapshotId, indexId, snapshotRef
-                .getIndexCommit(), indexShardSnapshotStatus));
+            runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
+                snapshotRef.getIndexCommit(), indexShardSnapshotStatus));
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             // we processed the segments_N file plus _1_1.liv
             assertEquals(2, copy.getIncrementalFileCount());
@@ -197,7 +197,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
                 repository.initializeSnapshot(snapshotId, Arrays.asList(indexId),
                     MetaData.builder().put(shard.indexSettings()
                     .getIndexMetaData(), false).build());
-                repository.snapshotShard(shard, shard.store(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus);
+                repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(),
+                    indexShardSnapshotStatus);
             });
             IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
             assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());