瀏覽代碼

Implement ccr file restore (#37130)

This is related to #35975. It implements a file based restore in the
CcrRepository. The restore transfers files from the leader cluster
to the follower cluster. It does not implement any advanced resiliency
features at the moment. Any request failure will end the restore.
Tim Brooks 6 年之前
父節點
當前提交
5c68338a1c
共有 18 個文件被更改,包括 934 次插入310 次删除
  1. 5 8
      server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java
  2. 2 4
      server/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java
  3. 1 1
      server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java
  4. 1 1
      server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java
  5. 23 217
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
  6. 304 0
      server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java
  7. 2 2
      server/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java
  8. 1 1
      test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java
  9. 10 0
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  10. 3 0
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java
  11. 129 0
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java
  12. 76 0
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java
  13. 13 3
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java
  14. 3 12
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java
  15. 124 17
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
  16. 117 31
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java
  17. 55 13
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java
  18. 65 0
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java

+ 5 - 8
server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java

@@ -21,7 +21,6 @@ package org.elasticsearch.common.bytes;
 
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefIterator;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.ByteArray;
 import org.elasticsearch.common.util.PageCacheRecycler;
 
@@ -35,17 +34,15 @@ public class PagedBytesReference extends BytesReference {
 
     private static final int PAGE_SIZE = PageCacheRecycler.BYTE_PAGE_SIZE;
 
-    private final BigArrays bigarrays;
-    protected final ByteArray byteArray;
+    private final ByteArray byteArray;
     private final int offset;
     private final int length;
 
-    public PagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) {
-        this(bigarrays, byteArray, 0, length);
+    public PagedBytesReference(ByteArray byteArray, int length) {
+        this(byteArray, 0, length);
     }
 
-    public PagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int from, int length) {
-        this.bigarrays = bigarrays;
+    private PagedBytesReference(ByteArray byteArray, int from, int length) {
         this.byteArray = byteArray;
         this.offset = from;
         this.length = length;
@@ -67,7 +64,7 @@ public class PagedBytesReference extends BytesReference {
             throw new IllegalArgumentException("can't slice a buffer with length [" + length() +
                 "], with slice parameters from [" + from + "], length [" + length + "]");
         }
-        return new PagedBytesReference(bigarrays, byteArray, offset + from, length);
+        return new PagedBytesReference(byteArray, offset + from, length);
     }
 
     @Override

+ 2 - 4
server/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java

@@ -21,7 +21,6 @@ package org.elasticsearch.common.bytes;
 
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.ByteArray;
 
 /**
@@ -32,9 +31,8 @@ public final class ReleasablePagedBytesReference extends PagedBytesReference imp
 
     private final Releasable releasable;
 
-    public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length,
-                                         Releasable releasable) {
-        super(bigarrays, byteArray, length);
+    public ReleasablePagedBytesReference(ByteArray byteArray, int length, Releasable releasable) {
+        super(byteArray, length);
         this.releasable = releasable;
     }
 

+ 1 - 1
server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java

@@ -140,7 +140,7 @@ public class BytesStreamOutput extends BytesStream {
 
     @Override
     public BytesReference bytes() {
-        return new PagedBytesReference(bigArrays, bytes, count);
+        return new PagedBytesReference(bytes, count);
     }
 
     /**

+ 1 - 1
server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java

@@ -56,7 +56,7 @@ public class ReleasableBytesStreamOutput extends BytesStreamOutput
      */
     @Override
     public ReleasablePagedBytesReference bytes() {
-        return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable);
+        return new ReleasablePagedBytesReference(bytes, count, releasable);
     }
 
     @Override

+ 23 - 217
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -22,15 +22,9 @@ package org.elasticsearch.repositories.blobstore;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexFormatTooNewException;
-import org.apache.lucene.index.IndexFormatTooOldException;
-import org.apache.lucene.index.IndexNotFoundException;
-import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.RateLimiter;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefBuilder;
@@ -65,7 +59,6 @@ import org.elasticsearch.common.metrics.CounterMetric;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -111,17 +104,12 @@ import java.nio.file.DirectoryNotEmptyException;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.NoSuchFileException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.unmodifiableMap;
 import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
 
 /**
@@ -864,9 +852,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     @Override
     public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
                              RecoveryState recoveryState) {
-        final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, indexId, snapshotShardId, recoveryState);
+        final Context context = new Context(snapshotId, indexId, shard.shardId(), snapshotShardId);
+        BlobPath path = basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId()));
+        BlobContainer blobContainer = blobStore().blobContainer(path);
+        final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, recoveryState, blobContainer);
         try {
-            snapshotContext.restore();
+            BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
+            SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
+            snapshotContext.restore(snapshotFiles);
         } catch (Exception e) {
             throw new IndexShardRestoreFailedException(shard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
         }
@@ -1459,216 +1452,29 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     /**
      * Context for restore operations
      */
-    private class RestoreContext extends Context {
+    private class RestoreContext extends FileRestoreContext {
 
-        private final IndexShard targetShard;
-
-        private final RecoveryState recoveryState;
+        private final BlobContainer blobContainer;
 
         /**
          * Constructs new restore context
-         *
-         * @param shard           shard to restore into
-         * @param snapshotId      snapshot id
-         * @param indexId         id of the index being restored
-         * @param snapshotShardId shard in the snapshot that data should be restored from
-         * @param recoveryState   recovery state to report progress
+         * @param indexShard    shard to restore into
+         * @param snapshotId    snapshot id
+         * @param recoveryState recovery state to report progress
+         * @param blobContainer the blob container to read the files from
          */
-        RestoreContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
-            super(snapshotId, indexId, shard.shardId(), snapshotShardId);
-            this.recoveryState = recoveryState;
-            this.targetShard = shard;
-        }
-
-        /**
-         * Performs restore operation
-         */
-        public void restore() throws IOException {
-            final Store store = targetShard.store();
-            store.incRef();
-            try {
-                logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, metadata.name(), shardId);
-                BlobStoreIndexShardSnapshot snapshot = loadSnapshot();
-
-                if (snapshot.indexFiles().size() == 1
-                    && snapshot.indexFiles().get(0).physicalName().startsWith("segments_")
-                    && snapshot.indexFiles().get(0).hasUnknownChecksum()) {
-                    // If the shard has no documents, it will only contain a single segments_N file for the
-                    // shard's snapshot.  If we are restoring a snapshot created by a previous supported version,
-                    // it is still possible that in that version, an empty shard has a segments_N file with an unsupported
-                    // version (and no checksum), because we don't know the Lucene version to assign segments_N until we
-                    // have written some data.  Since the segments_N for an empty shard could have an incompatible Lucene
-                    // version number and no checksum, even though the index itself is perfectly fine to restore, this
-                    // empty shard would cause exceptions to be thrown.  Since there is no data to restore from an empty
-                    // shard anyway, we just create the empty shard here and then exit.
-                    store.createEmpty(targetShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion);
-                    return;
-                }
-
-                SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
-                Store.MetadataSnapshot recoveryTargetMetadata;
-                try {
-                    // this will throw an IOException if the store has no segments infos file. The
-                    // store can still have existing files but they will be deleted just before being
-                    // restored.
-                    recoveryTargetMetadata = targetShard.snapshotStoreMetadata();
-                } catch (IndexNotFoundException e) {
-                    // happens when restore to an empty shard, not a big deal
-                    logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId);
-                    recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
-                } catch (IOException e) {
-                    logger.warn(() -> new ParameterizedMessage("{} Can't read metadata from store, will not reuse any " +
-                        "local file while restoring", shardId), e);
-                    recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
-                }
-
-                final List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover = new ArrayList<>();
-                final Map<String, StoreFileMetaData> snapshotMetaData = new HashMap<>();
-                final Map<String, BlobStoreIndexShardSnapshot.FileInfo> fileInfos = new HashMap<>();
-                for (final BlobStoreIndexShardSnapshot.FileInfo fileInfo : snapshot.indexFiles()) {
-                    try {
-                        // in 1.3.3 we added additional hashes for .si / segments_N files
-                        // to ensure we don't double the space in the repo since old snapshots
-                        // don't have this hash we try to read that hash from the blob store
-                        // in a bwc compatible way.
-                        maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata);
-                    } catch (Exception e) {
-                        // if the index is broken we might not be able to read it
-                        logger.warn(() -> new ParameterizedMessage("{} Can't calculate hash from blog for file [{}] [{}]",
-                            shardId, fileInfo.physicalName(), fileInfo.metadata()), e);
-                    }
-                    snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata());
-                    fileInfos.put(fileInfo.metadata().name(), fileInfo);
-                }
-
-                final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0);
-
-                final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile();
-                if (restoredSegmentsFile == null) {
-                    throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file");
-                }
-
-                final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
-                for (StoreFileMetaData md : diff.identical) {
-                    BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
-                    recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true);
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("[{}] [{}] not_recovering [{}] from [{}], exists in local store and is same",
-                            shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
-                    }
-                }
-
-                for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) {
-                    BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
-                    filesToRecover.add(fileInfo);
-                    recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false);
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("[{}] [{}] recovering [{}] from [{}], exists in local store but is different", shardId, snapshotId,
-                                fileInfo.physicalName(), fileInfo.name());
-                    }
-                }
-
-                if (filesToRecover.isEmpty()) {
-                    logger.trace("no files to recover, all exists within the local store");
-                }
-
-                try {
-                    // list of all existing store files
-                    final List<String> deleteIfExistFiles = Arrays.asList(store.directory().listAll());
-
-                    // restore the files from the snapshot to the Lucene store
-                    for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
-                        // if a file with a same physical name already exist in the store we need to delete it
-                        // before restoring it from the snapshot. We could be lenient and try to reuse the existing
-                        // store files (and compare their names/length/checksum again with the snapshot files) but to
-                        // avoid extra complexity we simply delete them and restore them again like StoreRecovery
-                        // does with dangling indices. Any existing store file that is not restored from the snapshot
-                        // will be clean up by RecoveryTarget.cleanFiles().
-                        final String physicalName = fileToRecover.physicalName();
-                        if (deleteIfExistFiles.contains(physicalName)) {
-                            logger.trace("[{}] [{}] deleting pre-existing file [{}]", shardId, snapshotId, physicalName);
-                            store.directory().deleteFile(physicalName);
-                        }
-
-                        logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
-                        restoreFile(fileToRecover, store);
-                    }
-                } catch (IOException ex) {
-                    throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
-                }
-
-                // read the snapshot data persisted
-                final SegmentInfos segmentCommitInfos;
-                try {
-                    segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
-                } catch (IOException e) {
-                    throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
-                }
-                recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion());
-
-                /// now, go over and clean files that are in the store, but were not in the snapshot
-                try {
-                    for (String storeFile : store.directory().listAll()) {
-                        if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
-                            continue; //skip write.lock, checksum files and files that exist in the snapshot
-                        }
-                        try {
-                            store.deleteQuiet("restore", storeFile);
-                            store.directory().deleteFile(storeFile);
-                        } catch (IOException e) {
-                            logger.warn("[{}] failed to delete file [{}] during snapshot cleanup", snapshotId, storeFile);
-                        }
-                    }
-                } catch (IOException e) {
-                    logger.warn("[{}] failed to list directory - some of files might not be deleted", snapshotId);
-                }
-            } finally {
-                store.decRef();
-            }
+        RestoreContext(IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) {
+            super(metadata.name(), indexShard, snapshotId, recoveryState, BUFFER_SIZE);
+            this.blobContainer = blobContainer;
         }
 
-        /**
-         * Restores a file
-         * This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are
-         * added to the {@code failures} list
-         *
-         * @param fileInfo file to be restored
-         */
-        private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException {
-            boolean success = false;
-
-            try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) {
-                final InputStream stream;
-                if (restoreRateLimiter == null) {
-                    stream = partSliceStream;
-                } else {
-                    stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc);
-                }
-
-                try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(),
-                                                                           fileInfo.metadata(), IOContext.DEFAULT)) {
-                    final byte[] buffer = new byte[BUFFER_SIZE];
-                    int length;
-                    while ((length = stream.read(buffer)) > 0) {
-                        indexOutput.writeBytes(buffer, 0, length);
-                        recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
-                    }
-                    Store.verify(indexOutput);
-                    indexOutput.close();
-                    store.directory().sync(Collections.singleton(fileInfo.physicalName()));
-                    success = true;
-                } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
-                    try {
-                        store.markStoreCorrupted(ex);
-                    } catch (IOException e) {
-                        logger.warn("store cannot be marked as corrupted", e);
-                    }
-                    throw ex;
-                } finally {
-                    if (success == false) {
-                        store.deleteQuiet(fileInfo.physicalName());
-                    }
-                }
+        @Override
+        protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
+            if (restoreRateLimiter == null) {
+                return new PartSliceStream(blobContainer, fileInfo);
+            } else {
+                RateLimitingInputStream.Listener listener = restoreRateLimitingTimeInNanos::inc;
+                return new RateLimitingInputStream(new PartSliceStream(blobContainer, fileInfo), restoreRateLimiter, listener);
             }
         }
     }

+ 304 - 0
server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java

@@ -0,0 +1,304 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.repositories.blobstore;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexFormatTooNewException;
+import org.apache.lucene.index.IndexFormatTooOldException;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefBuilder;
+import org.elasticsearch.common.lucene.Lucene;
+import org.elasticsearch.common.util.iterable.Iterables;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
+import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
+import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.store.StoreFileMetaData;
+import org.elasticsearch.indices.recovery.RecoveryState;
+import org.elasticsearch.snapshots.SnapshotId;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.unmodifiableMap;
+
+/**
+ * This context will execute a file restore of the lucene files. It is primarily designed to be used to
+ * restore from some form of a snapshot. It will setup a new store, identify files that need to be copied
+ * for the source, and perform the copies. Implementers must implement the functionality of opening the
+ * underlying file streams for snapshotted lucene file.
+ */
+public abstract class FileRestoreContext {
+
+    private static final Logger logger = LogManager.getLogger(FileRestoreContext.class);
+
+    private final String repositoryName;
+    private final IndexShard indexShard;
+    private final RecoveryState recoveryState;
+    private final SnapshotId snapshotId;
+    private final ShardId shardId;
+    private final int bufferSize;
+
+    /**
+     * Constructs new restore context
+     *
+     * @param indexShard    shard to restore into
+     * @param snapshotId    snapshot id
+     * @param recoveryState recovery state to report progress
+     * @param bufferSize    buffer size for restore
+     */
+    protected FileRestoreContext(String repositoryName, IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState,
+                                 int bufferSize) {
+        this.repositoryName = repositoryName;
+        this.recoveryState = recoveryState;
+        this.indexShard = indexShard;
+        this.snapshotId = snapshotId;
+        this.shardId = indexShard.shardId();
+        this.bufferSize = bufferSize;
+    }
+
+    /**
+     * Performs restore operation
+     */
+    public void restore(SnapshotFiles snapshotFiles) throws IOException {
+        final Store store = indexShard.store();
+        store.incRef();
+        try {
+            logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
+
+            if (snapshotFiles.indexFiles().size() == 1
+                && snapshotFiles.indexFiles().get(0).physicalName().startsWith("segments_")
+                && snapshotFiles.indexFiles().get(0).hasUnknownChecksum()) {
+                // If the shard has no documents, it will only contain a single segments_N file for the
+                // shard's snapshot.  If we are restoring a snapshot created by a previous supported version,
+                // it is still possible that in that version, an empty shard has a segments_N file with an unsupported
+                // version (and no checksum), because we don't know the Lucene version to assign segments_N until we
+                // have written some data.  Since the segments_N for an empty shard could have an incompatible Lucene
+                // version number and no checksum, even though the index itself is perfectly fine to restore, this
+                // empty shard would cause exceptions to be thrown.  Since there is no data to restore from an empty
+                // shard anyway, we just create the empty shard here and then exit.
+                store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion);
+                return;
+            }
+
+            Store.MetadataSnapshot recoveryTargetMetadata;
+            try {
+                // this will throw an IOException if the store has no segments infos file. The
+                // store can still have existing files but they will be deleted just before being
+                // restored.
+                recoveryTargetMetadata = indexShard.snapshotStoreMetadata();
+            } catch (IndexNotFoundException e) {
+                // happens when restore to an empty shard, not a big deal
+                logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId);
+                recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
+            } catch (IOException e) {
+                logger.warn(new ParameterizedMessage("[{}] [{}] Can't read metadata from store, will not reuse local files during restore",
+                    shardId, snapshotId), e);
+                recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
+            }
+
+            final List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover = new ArrayList<>();
+            final Map<String, StoreFileMetaData> snapshotMetaData = new HashMap<>();
+            final Map<String, BlobStoreIndexShardSnapshot.FileInfo> fileInfos = new HashMap<>();
+            for (final BlobStoreIndexShardSnapshot.FileInfo fileInfo : snapshotFiles.indexFiles()) {
+                try {
+                    // in 1.3.3 we added additional hashes for .si / segments_N files
+                    // to ensure we don't double the space in the repo since old snapshots
+                    // don't have this hash we try to read that hash from the blob store
+                    // in a bwc compatible way.
+                    maybeRecalculateMetadataHash(fileInfo, recoveryTargetMetadata);
+                } catch (Exception e) {
+                    // if the index is broken we might not be able to read it
+                    logger.warn(new ParameterizedMessage("[{}] Can't calculate hash from blog for file [{}] [{}]", shardId,
+                        fileInfo.physicalName(), fileInfo.metadata()), e);
+                }
+                snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata());
+                fileInfos.put(fileInfo.metadata().name(), fileInfo);
+            }
+
+            final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0);
+
+            final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile();
+            if (restoredSegmentsFile == null) {
+                throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file");
+            }
+
+            final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
+            for (StoreFileMetaData md : diff.identical) {
+                BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
+                recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true);
+                if (logger.isTraceEnabled()) {
+                    logger.trace("[{}] [{}] not_recovering file [{}] from [{}], exists in local store and is same", shardId, snapshotId,
+                        fileInfo.physicalName(), fileInfo.name());
+                }
+            }
+
+            for (StoreFileMetaData md : concat(diff)) {
+                BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
+                filesToRecover.add(fileInfo);
+                recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false);
+                if (logger.isTraceEnabled()) {
+                    logger.trace("[{}] [{}] recovering [{}] from [{}]", shardId, snapshotId,
+                        fileInfo.physicalName(), fileInfo.name());
+                }
+            }
+
+            if (filesToRecover.isEmpty()) {
+                logger.trace("[{}] [{}] no files to recover, all exist within the local store", shardId, snapshotId);
+            }
+
+            try {
+                // list of all existing store files
+                final List<String> deleteIfExistFiles = Arrays.asList(store.directory().listAll());
+
+                // restore the files from the snapshot to the Lucene store
+                for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
+                    // if a file with a same physical name already exist in the store we need to delete it
+                    // before restoring it from the snapshot. We could be lenient and try to reuse the existing
+                    // store files (and compare their names/length/checksum again with the snapshot files) but to
+                    // avoid extra complexity we simply delete them and restore them again like StoreRecovery
+                    // does with dangling indices. Any existing store file that is not restored from the snapshot
+                    // will be clean up by RecoveryTarget.cleanFiles().
+                    final String physicalName = fileToRecover.physicalName();
+                    if (deleteIfExistFiles.contains(physicalName)) {
+                        logger.trace("[{}] [{}] deleting pre-existing file [{}]", shardId, snapshotId, physicalName);
+                        store.directory().deleteFile(physicalName);
+                    }
+
+                    logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
+                    restoreFile(fileToRecover, store);
+                }
+            } catch (IOException ex) {
+                throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
+            }
+
+            // read the snapshot data persisted
+            final SegmentInfos segmentCommitInfos;
+            try {
+                segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
+            } catch (IOException e) {
+                throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
+            }
+            recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion());
+
+            /// now, go over and clean files that are in the store, but were not in the snapshot
+            try {
+                for (String storeFile : store.directory().listAll()) {
+                    if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
+                        continue; //skip write.lock, checksum files and files that exist in the snapshot
+                    }
+                    try {
+                        store.deleteQuiet("restore", storeFile);
+                        store.directory().deleteFile(storeFile);
+                    } catch (IOException e) {
+                        logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile);
+                    }
+                }
+            } catch (IOException e) {
+                logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId);
+            }
+        } finally {
+            store.decRef();
+        }
+    }
+
+    protected abstract InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo);
+
+    @SuppressWarnings("unchecked")
+    private Iterable<StoreFileMetaData> concat(Store.RecoveryDiff diff) {
+        return Iterables.concat(diff.different, diff.missing);
+    }
+
+    /**
+     * Restores a file
+     *
+     * @param fileInfo file to be restored
+     */
+    private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException {
+        boolean success = false;
+
+        try (InputStream stream = fileInputStream(fileInfo)) {
+            try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
+                final byte[] buffer = new byte[bufferSize];
+                int length;
+                while ((length = stream.read(buffer)) > 0) {
+                    indexOutput.writeBytes(buffer, 0, length);
+                    recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
+                }
+                Store.verify(indexOutput);
+                indexOutput.close();
+                store.directory().sync(Collections.singleton(fileInfo.physicalName()));
+                success = true;
+            } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
+                try {
+                    store.markStoreCorrupted(ex);
+                } catch (IOException e) {
+                    logger.warn("store cannot be marked as corrupted", e);
+                }
+                throw ex;
+            } finally {
+                if (success == false) {
+                    store.deleteQuiet(fileInfo.physicalName());
+                }
+            }
+        }
+    }
+
+    /**
+     * This is a BWC layer to ensure we update the snapshots metadata with the corresponding hashes before we compare them.
+     * The new logic for StoreFileMetaData reads the entire {@code .si} and {@code segments.n} files to strengthen the
+     * comparison of the files on a per-segment / per-commit level.
+     */
+    private void maybeRecalculateMetadataHash(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store.MetadataSnapshot snapshot)
+        throws IOException {
+        final StoreFileMetaData metadata;
+        if (fileInfo != null && (metadata = snapshot.get(fileInfo.physicalName())) != null) {
+            if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) {
+                // we have a hash - check if our repo has a hash too otherwise we have
+                // to calculate it.
+                // we might have multiple parts even though the file is small... make sure we read all of it.
+                try (InputStream stream = fileInputStream(fileInfo)) {
+                    BytesRefBuilder builder = new BytesRefBuilder();
+                    Store.MetadataSnapshot.hashFile(builder, stream, fileInfo.length());
+                    BytesRef hash = fileInfo.metadata().hash(); // reset the file infos metadata hash
+                    assert hash.length == 0;
+                    hash.bytes = builder.bytes();
+                    hash.offset = 0;
+                    hash.length = builder.length();
+                }
+            }
+        }
+    }
+}

+ 2 - 2
server/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java

@@ -120,8 +120,8 @@ public class PagedBytesReferenceTests extends AbstractBytesReferenceTestCase {
         }
 
         // get refs & compare
-        BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length);
-        BytesReference pbr2 = new PagedBytesReference(bigarrays, ba2, length);
+        BytesReference pbr = new PagedBytesReference(ba1, length);
+        BytesReference pbr2 = new PagedBytesReference(ba2, length);
         assertEquals(pbr, pbr2);
         int offsetToFlip = randomIntBetween(0, length - 1);
         int value = ~Byte.toUnsignedInt(ba1.get(offsetToFlip));

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java

@@ -528,7 +528,7 @@ public abstract class AbstractBytesReferenceTestCase extends ESTestCase {
     public void testSliceEquals() {
         int length = randomIntBetween(100, PAGE_SIZE * randomIntBetween(2, 5));
         ByteArray ba1 = bigarrays.newByteArray(length, false);
-        BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length);
+        BytesReference pbr = new PagedBytesReference(ba1, length);
 
         // test equality of slices
         int sliceFrom = randomIntBetween(0, pbr.length());

+ 10 - 0
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -448,6 +448,16 @@ public abstract class IndexShardTestCase extends ESTestCase {
         return newStartedShard(primary, Settings.EMPTY, new InternalEngineFactory());
     }
 
+    /**
+     * Creates a new empty shard and starts it.
+     *
+     * @param primary controls whether the shard will be a primary or a replica.
+     * @param settings the settings to use for this shard
+     */
+    protected IndexShard newStartedShard(final boolean primary, Settings settings) throws IOException {
+        return newStartedShard(primary, settings, new InternalEngineFactory());
+    }
+
     /**
      * Creates a new empty shard with the specified settings and engine factory and starts it.
      *

+ 3 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

@@ -61,6 +61,7 @@ import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
 import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
 import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
 import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction;
+import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction;
 import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
 import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction;
 import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
@@ -193,6 +194,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
                     PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class),
                 new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE,
                     ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class),
+                new ActionHandler<>(GetCcrRestoreFileChunkAction.INSTANCE,
+                    GetCcrRestoreFileChunkAction.TransportGetCcrRestoreFileChunkAction.class),
                 // stats action
                 new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),
                 new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),

+ 129 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java

@@ -0,0 +1,129 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ccr.action.repositories;
+
+import org.elasticsearch.action.Action;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ByteArray;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportActionProxy;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
+
+import java.io.IOException;
+
+public class GetCcrRestoreFileChunkAction extends Action<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> {
+
+    public static final GetCcrRestoreFileChunkAction INSTANCE = new GetCcrRestoreFileChunkAction();
+    public static final String NAME = "internal:admin/ccr/restore/file_chunk/get";
+
+    private GetCcrRestoreFileChunkAction() {
+        super(NAME);
+    }
+
+    @Override
+    public GetCcrRestoreFileChunkResponse newResponse() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Writeable.Reader<GetCcrRestoreFileChunkResponse> getResponseReader() {
+        return GetCcrRestoreFileChunkResponse::new;
+    }
+
+
+    public static class TransportGetCcrRestoreFileChunkAction
+        extends HandledTransportAction<GetCcrRestoreFileChunkRequest, GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> {
+
+        private final CcrRestoreSourceService restoreSourceService;
+        private final ThreadPool threadPool;
+        private final BigArrays bigArrays;
+
+        @Inject
+        public TransportGetCcrRestoreFileChunkAction(BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters,
+                                                     CcrRestoreSourceService restoreSourceService) {
+            super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new);
+            TransportActionProxy.registerProxyAction(transportService, NAME, GetCcrRestoreFileChunkResponse::new);
+            this.threadPool = transportService.getThreadPool();
+            this.restoreSourceService = restoreSourceService;
+            this.bigArrays = bigArrays;
+        }
+
+        @Override
+        protected void doExecute(Task task, GetCcrRestoreFileChunkRequest request,
+                                 ActionListener<GetCcrRestoreFileChunkResponse> listener) {
+            threadPool.generic().execute(new AbstractRunnable() {
+                @Override
+                public void onFailure(Exception e) {
+                    listener.onFailure(e);
+                }
+
+                @Override
+                protected void doRun() throws Exception {
+                    int bytesRequested = request.getSize();
+                    ByteArray array = bigArrays.newByteArray(bytesRequested, false);
+                    String fileName = request.getFileName();
+                    String sessionUUID = request.getSessionUUID();
+                    // This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data
+                    // structure on the same thread. So the bytes will be copied before the reference is released.
+                    try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) {
+                        try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
+                            long offsetAfterRead = sessionReader.readFileBytes(fileName, reference);
+                            long offsetBeforeRead = offsetAfterRead - reference.length();
+                            listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference));
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+    public static class GetCcrRestoreFileChunkResponse extends ActionResponse {
+
+        private final long offset;
+        private final BytesReference chunk;
+
+        GetCcrRestoreFileChunkResponse(StreamInput streamInput) throws IOException {
+            super(streamInput);
+            offset = streamInput.readVLong();
+            chunk = streamInput.readBytesReference();
+        }
+
+        GetCcrRestoreFileChunkResponse(long offset, BytesReference chunk) {
+            this.offset = offset;
+            this.chunk = chunk;
+        }
+
+        public long getOffset() {
+            return offset;
+        }
+
+        public BytesReference getChunk() {
+            return chunk;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeVLong(offset);
+            out.writeBytesReference(chunk);
+        }
+
+    }
+}

+ 76 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java

@@ -0,0 +1,76 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ccr.action.repositories;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.transport.RemoteClusterAwareRequest;
+
+import java.io.IOException;
+
+public class GetCcrRestoreFileChunkRequest extends ActionRequest implements RemoteClusterAwareRequest {
+
+    private final DiscoveryNode node;
+    private final String sessionUUID;
+    private final String fileName;
+    private final int size;
+
+    @Override
+    public ActionRequestValidationException validate() {
+        return null;
+    }
+
+    public GetCcrRestoreFileChunkRequest(DiscoveryNode node, String sessionUUID, String fileName, int size) {
+        this.node = node;
+        this.sessionUUID = sessionUUID;
+        this.fileName = fileName;
+        this.size = size;
+        assert size > -1 : "The file chunk request size must be positive. Found: [" + size + "].";
+    }
+
+    GetCcrRestoreFileChunkRequest(StreamInput in) throws IOException {
+        super(in);
+        node = null;
+        sessionUUID = in.readString();
+        fileName = in.readString();
+        size = in.readVInt();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeString(sessionUUID);
+        out.writeString(fileName);
+        out.writeVInt(size);
+    }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    String getSessionUUID() {
+        return sessionUUID;
+    }
+
+    String getFileName() {
+        return fileName;
+    }
+
+    int getSize() {
+        return size;
+    }
+
+    @Override
+    public DiscoveryNode getPreferredTargetNode() {
+        assert node != null : "Target node is null";
+        return node;
+    }
+}

+ 13 - 3
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardNotFoundException;
+import org.elasticsearch.index.store.Store;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -70,8 +71,8 @@ public class PutCcrRestoreSessionAction extends Action<PutCcrRestoreSessionActio
             if (indexShard == null) {
                 throw new ShardNotFoundException(shardId);
             }
-            ccrRestoreService.openSession(request.getSessionUUID(), indexShard);
-            return new PutCcrRestoreSessionResponse(clusterService.localNode());
+            Store.MetadataSnapshot storeFileMetaData = ccrRestoreService.openSession(request.getSessionUUID(), indexShard);
+            return new PutCcrRestoreSessionResponse(clusterService.localNode(), storeFileMetaData);
         }
 
         @Override
@@ -95,33 +96,42 @@ public class PutCcrRestoreSessionAction extends Action<PutCcrRestoreSessionActio
     public static class PutCcrRestoreSessionResponse extends ActionResponse {
 
         private DiscoveryNode node;
+        private Store.MetadataSnapshot storeFileMetaData;
 
         PutCcrRestoreSessionResponse() {
         }
 
-        PutCcrRestoreSessionResponse(DiscoveryNode node) {
+        PutCcrRestoreSessionResponse(DiscoveryNode node, Store.MetadataSnapshot storeFileMetaData) {
             this.node = node;
+            this.storeFileMetaData = storeFileMetaData;
         }
 
         PutCcrRestoreSessionResponse(StreamInput in) throws IOException {
             super(in);
             node = new DiscoveryNode(in);
+            storeFileMetaData = new Store.MetadataSnapshot(in);
         }
 
         @Override
         public void readFrom(StreamInput in) throws IOException {
             super.readFrom(in);
             node = new DiscoveryNode(in);
+            storeFileMetaData = new Store.MetadataSnapshot(in);
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
             node.writeTo(out);
+            storeFileMetaData.writeTo(out);
         }
 
         public DiscoveryNode getNode() {
             return node;
         }
+
+        public Store.MetadataSnapshot getStoreFileMetaData() {
+            return storeFileMetaData;
+        }
     }
 }

+ 3 - 12
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java

@@ -11,7 +11,6 @@ import org.elasticsearch.action.support.single.shard.SingleShardRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.store.Store;
 
 import java.io.IOException;
 
@@ -19,16 +18,14 @@ public class PutCcrRestoreSessionRequest extends SingleShardRequest<PutCcrRestor
 
     private String sessionUUID;
     private ShardId shardId;
-    private Store.MetadataSnapshot metaData;
 
     PutCcrRestoreSessionRequest() {
     }
 
-    public PutCcrRestoreSessionRequest(String sessionUUID, ShardId shardId, Store.MetadataSnapshot metaData) {
+    public PutCcrRestoreSessionRequest(String sessionUUID, ShardId shardId) {
         super(shardId.getIndexName());
         this.sessionUUID = sessionUUID;
         this.shardId = shardId;
-        this.metaData = metaData;
     }
 
     @Override
@@ -41,7 +38,6 @@ public class PutCcrRestoreSessionRequest extends SingleShardRequest<PutCcrRestor
         super.readFrom(in);
         sessionUUID = in.readString();
         shardId = ShardId.readShardId(in);
-        metaData = new Store.MetadataSnapshot(in);
     }
 
     @Override
@@ -49,18 +45,13 @@ public class PutCcrRestoreSessionRequest extends SingleShardRequest<PutCcrRestor
         super.writeTo(out);
         out.writeString(sessionUUID);
         shardId.writeTo(out);
-        metaData.writeTo(out);
     }
 
-    public String getSessionUUID() {
+    String getSessionUUID() {
         return sessionUUID;
     }
 
-    public ShardId getShardId() {
+    ShardId getShardId() {
         return shardId;
     }
-
-    public Store.MetadataSnapshot getMetaData() {
-        return metaData;
-    }
 }

+ 124 - 17
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -20,21 +20,29 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.engine.EngineException;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardRecoveryException;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
+import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
+import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
 import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.store.StoreFileMetaData;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
+import org.elasticsearch.repositories.blobstore.FileRestoreContext;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.snapshots.SnapshotShardFailure;
@@ -44,10 +52,14 @@ import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
 import org.elasticsearch.xpack.ccr.action.CcrRequests;
 import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
 import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest;
+import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction;
+import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkRequest;
 import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
 import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionRequest;
 
+import java.io.Closeable;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -237,25 +249,21 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
             store.decRef();
         }
 
-        Store.MetadataSnapshot recoveryMetadata;
-        try {
-            recoveryMetadata = indexShard.snapshotStoreMetadata();
-        } catch (IOException e) {
-            throw new IndexShardRecoveryException(shardId, "failed access store metadata", e);
-        }
-
         Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
         String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
         Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID);
         ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId());
 
         Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
-        String sessionUUID = UUIDs.randomBase64UUID();
-        PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
-            new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet();
-        DiscoveryNode node = response.getNode();
-        // TODO: Implement file restore
-        closeSession(remoteClient, node, sessionUUID);
+        // TODO: There should be some local timeout. And if the remote cluster returns an unknown session
+        //  response, we should be able to retry by creating a new session.
+        String name = metadata.name();
+        try (RestoreSession restoreSession = RestoreSession.openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
+            restoreSession.restoreFiles();
+        } catch (Exception e) {
+            throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
+        }
+
         maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings());
     }
 
@@ -278,9 +286,108 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         }
     }
 
-    private void closeSession(Client remoteClient, DiscoveryNode node, String sessionUUID) {
-        ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);
-        ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
-            remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet();
+    private static class RestoreSession extends FileRestoreContext implements Closeable {
+
+        private static final int BUFFER_SIZE = 1 << 16;
+
+        private final Client remoteClient;
+        private final String sessionUUID;
+        private final DiscoveryNode node;
+        private final Store.MetadataSnapshot sourceMetaData;
+
+        RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
+                       RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData) {
+            super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
+            this.remoteClient = remoteClient;
+            this.sessionUUID = sessionUUID;
+            this.node = node;
+            this.sourceMetaData = sourceMetaData;
+        }
+
+        static RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
+                                          RecoveryState recoveryState) {
+            String sessionUUID = UUIDs.randomBase64UUID();
+            PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
+                new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
+            return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
+                response.getStoreFileMetaData());
+        }
+
+        void restoreFiles() throws IOException {
+            ArrayList<BlobStoreIndexShardSnapshot.FileInfo> fileInfos = new ArrayList<>();
+            for (StoreFileMetaData fileMetaData : sourceMetaData) {
+                ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length());
+                fileInfos.add(new BlobStoreIndexShardSnapshot.FileInfo(fileMetaData.name(), fileMetaData, fileSize));
+            }
+            SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos);
+            restore(snapshotFiles);
+        }
+
+        @Override
+        protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
+            return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata());
+        }
+
+        @Override
+        public void close() {
+            ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);
+            ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
+                remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet();
+        }
+    }
+
+    private static class RestoreFileInputStream extends InputStream {
+
+        private final Client remoteClient;
+        private final String sessionUUID;
+        private final DiscoveryNode node;
+        private final StoreFileMetaData fileToRecover;
+
+        private long pos = 0;
+
+        private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover) {
+            this.remoteClient = remoteClient;
+            this.sessionUUID = sessionUUID;
+            this.node = node;
+            this.fileToRecover = fileToRecover;
+        }
+
+
+        @Override
+        public int read() throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int read(byte[] bytes, int off, int len) throws IOException {
+            long remainingBytes = fileToRecover.length() - pos;
+            if (remainingBytes <= 0) {
+                return 0;
+            }
+
+            int bytesRequested = (int) Math.min(remainingBytes, len);
+            String fileName = fileToRecover.name();
+            GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested);
+            GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response =
+                remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet();
+            BytesReference fileChunk = response.getChunk();
+
+            int bytesReceived = fileChunk.length();
+            if (bytesReceived > bytesRequested) {
+                throw new IOException("More bytes [" + bytesReceived + "] received than requested [" + bytesRequested + "]");
+            }
+
+            long leaderOffset = response.getOffset();
+            assert pos == leaderOffset : "Position [" + pos + "] should be equal to the leader file offset [" + leaderOffset + "].";
+
+            try (StreamInput streamInput = fileChunk.streamInput()) {
+                int bytesRead = streamInput.read(bytes, 0, bytesReceived);
+                assert bytesRead == bytesReceived : "Did not read the correct number of bytes";
+            }
+
+            pos += bytesReceived;
+
+            return bytesReceived;
+        }
     }
 }

+ 117 - 31
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java

@@ -8,10 +8,18 @@ package org.elasticsearch.xpack.ccr.repository;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.KeyedLock;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.shard.IndexEventListener;
@@ -23,9 +31,11 @@ import org.elasticsearch.index.store.Store;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Consumer;
 
@@ -33,7 +43,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
 
     private static final Logger logger = LogManager.getLogger(CcrRestoreSourceService.class);
 
-    private final Map<String, RestoreContext> onGoingRestores = ConcurrentCollections.newConcurrentMap();
+    private final Map<String, RestoreSession> onGoingRestores = ConcurrentCollections.newConcurrentMap();
     private final Map<IndexShard, HashSet<String>> sessionsForShard = new HashMap<>();
     private final CopyOnWriteArrayList<Consumer<String>> openSessionListeners = new CopyOnWriteArrayList<>();
     private final CopyOnWriteArrayList<Consumer<String>> closeSessionListeners = new CopyOnWriteArrayList<>();
@@ -48,8 +58,9 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
             HashSet<String> sessions = sessionsForShard.remove(indexShard);
             if (sessions != null) {
                 for (String sessionUUID : sessions) {
-                    RestoreContext restore = onGoingRestores.remove(sessionUUID);
-                    IOUtils.closeWhileHandlingException(restore);
+                    RestoreSession restore = onGoingRestores.remove(sessionUUID);
+                    assert restore != null : "Session UUID [" + sessionUUID + "] registered for shard but not found in ongoing restores";
+                    restore.decRef();
                 }
             }
         }
@@ -68,7 +79,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
     @Override
     protected synchronized void doClose() throws IOException {
         sessionsForShard.clear();
-        IOUtils.closeWhileHandlingException(onGoingRestores.values());
+        onGoingRestores.values().forEach(AbstractRefCounted::decRef);
         onGoingRestores.clear();
     }
 
@@ -88,7 +99,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
     }
 
     // default visibility for testing
-    synchronized RestoreContext getOngoingRestore(String sessionUUID) {
+    synchronized RestoreSession getOngoingRestore(String sessionUUID) {
         return onGoingRestores.get(sessionUUID);
     }
 
@@ -96,7 +107,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
     //  complete. Or it could be for session to have been touched.
     public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException {
         boolean success = false;
-        RestoreContext restore = null;
+        RestoreSession restore = null;
         try {
             if (onGoingRestores.containsKey(sessionUUID)) {
                 logger.debug("not opening new session [{}] as it already exists", sessionUUID);
@@ -106,46 +117,72 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
                 if (indexShard.state() == IndexShardState.CLOSED) {
                     throw new IndexShardClosedException(indexShard.shardId(), "cannot open ccr restore session if shard closed");
                 }
-                restore = new RestoreContext(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit());
+                restore = new RestoreSession(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit());
                 onGoingRestores.put(sessionUUID, restore);
                 openSessionListeners.forEach(c -> c.accept(sessionUUID));
-                HashSet<String> sessions = sessionsForShard.computeIfAbsent(indexShard, (s) ->  new HashSet<>());
+                HashSet<String> sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>());
                 sessions.add(sessionUUID);
             }
             Store.MetadataSnapshot metaData = restore.getMetaData();
             success = true;
             return metaData;
         } finally {
-            if (success ==  false) {
+            if (success == false) {
                 onGoingRestores.remove(sessionUUID);
-                IOUtils.closeWhileHandlingException(restore);
+                if (restore != null) {
+                    restore.decRef();
+                }
+            }
+        }
+    }
+
+    public void closeSession(String sessionUUID) {
+        final RestoreSession restore;
+        synchronized (this) {
+            closeSessionListeners.forEach(c -> c.accept(sessionUUID));
+            restore = onGoingRestores.remove(sessionUUID);
+            if (restore == null) {
+                logger.debug("could not close session [{}] because session not found", sessionUUID);
+                throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
+            }
+            HashSet<String> sessions = sessionsForShard.get(restore.indexShard);
+            assert sessions != null : "No session UUIDs for shard even though one [" + sessionUUID + "] is active in ongoing restores";
+            if (sessions != null) {
+                boolean removed = sessions.remove(sessionUUID);
+                assert removed : "No session found for UUID [" + sessionUUID +"]";
+                if (sessions.isEmpty()) {
+                    sessionsForShard.remove(restore.indexShard);
+                }
             }
         }
+        restore.decRef();
     }
 
-    public synchronized void closeSession(String sessionUUID) {
-        closeSessionListeners.forEach(c -> c.accept(sessionUUID));
-        RestoreContext restore = onGoingRestores.remove(sessionUUID);
+    public synchronized SessionReader getSessionReader(String sessionUUID) {
+        RestoreSession restore = onGoingRestores.get(sessionUUID);
         if (restore == null) {
-            logger.info("could not close session [{}] because session not found", sessionUUID);
+            logger.debug("could not get session [{}] because session not found", sessionUUID);
             throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
         }
-        IOUtils.closeWhileHandlingException(restore);
+        return new SessionReader(restore);
     }
 
-    private class RestoreContext implements Closeable {
+    private static class RestoreSession extends AbstractRefCounted {
 
         private final String sessionUUID;
         private final IndexShard indexShard;
         private final Engine.IndexCommitRef commitRef;
+        private final KeyedLock<String> keyedLock = new KeyedLock<>();
+        private final Map<String, IndexInput> cachedInputs = new ConcurrentHashMap<>();
 
-        private RestoreContext(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) {
+        private RestoreSession(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) {
+            super("restore-session");
             this.sessionUUID = sessionUUID;
             this.indexShard = indexShard;
             this.commitRef = commitRef;
         }
 
-        Store.MetadataSnapshot getMetaData() throws IOException {
+        private Store.MetadataSnapshot getMetaData() throws IOException {
             indexShard.store().incRef();
             try {
                 return indexShard.store().getMetadata(commitRef.getIndexCommit());
@@ -154,22 +191,71 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
             }
         }
 
-        @Override
-        public void close() {
-            assert Thread.holdsLock(CcrRestoreSourceService.this);
-            removeSessionForShard(sessionUUID, indexShard);
-            IOUtils.closeWhileHandlingException(commitRef);
-        }
+        private long readFileBytes(String fileName, BytesReference reference) throws IOException {
+            Releasable lock = keyedLock.tryAcquire(fileName);
+            if (lock == null) {
+                throw new IllegalStateException("can't read from the same file on the same session concurrently");
+            }
+            try (Releasable releasable = lock) {
+                final IndexInput indexInput = cachedInputs.computeIfAbsent(fileName, f -> {
+                    try {
+                        return commitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE);
+                    } catch (IOException e) {
+                        throw new UncheckedIOException(e);
+                    }
+                });
 
-        private void removeSessionForShard(String sessionUUID, IndexShard indexShard) {
-            logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId());
-            HashSet<String> sessions = sessionsForShard.get(indexShard);
-            if (sessions != null) {
-                sessions.remove(sessionUUID);
-                if (sessions.isEmpty()) {
-                    sessionsForShard.remove(indexShard);
+                BytesRefIterator refIterator = reference.iterator();
+                BytesRef ref;
+                while ((ref = refIterator.next()) != null) {
+                    byte[] refBytes = ref.bytes;
+                    indexInput.readBytes(refBytes, 0, refBytes.length);
                 }
+
+                long offsetAfterRead = indexInput.getFilePointer();
+
+                if (offsetAfterRead == indexInput.length()) {
+                    cachedInputs.remove(fileName);
+                    IOUtils.close(indexInput);
+                }
+
+                return offsetAfterRead;
             }
         }
+
+        @Override
+        protected void closeInternal() {
+            logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId());
+            assert keyedLock.hasLockedKeys() == false : "Should not hold any file locks when closing";
+            IOUtils.closeWhileHandlingException(cachedInputs.values());
+        }
+    }
+
+    public static class SessionReader implements Closeable {
+
+        private final RestoreSession restoreSession;
+
+        private SessionReader(RestoreSession restoreSession) {
+            this.restoreSession = restoreSession;
+            restoreSession.incRef();
+        }
+
+        /**
+         * Read bytes into the reference from the file. This method will return the offset in the file where
+         * the read completed.
+         *
+         * @param fileName to read
+         * @param reference to read bytes into
+         * @return the offset of the file after the read is complete
+         * @throws IOException if the read fails
+         */
+        public long readFileBytes(String fileName, BytesReference reference) throws IOException {
+            return restoreSession.readFileBytes(fileName, reference);
+        }
+
+        @Override
+        public void close() {
+            restoreSession.decRef();
+        }
     }
 }

+ 55 - 13
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

@@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -21,7 +22,6 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.index.IndexSettings;
@@ -40,8 +40,8 @@ import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
 import java.io.IOException;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Collections.singletonMap;
 import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
@@ -159,7 +159,7 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
         assertNotEquals(leaderMetadata.getIndexUUID(), followerMetadata.getIndexUUID());
     }
 
-    public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException {
+    public void testDocsAreRecovered() throws Exception {
         String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
         String leaderIndex = "index1";
         String followerIndex = "index2";
@@ -173,6 +173,45 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
         final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
         final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
 
+        final int firstBatchNumDocs = randomIntBetween(1, 64);
+        logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
+        for (int i = 0; i < firstBatchNumDocs; i++) {
+            final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
+            leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
+        }
+
+        leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();
+
+        AtomicBoolean isRunning = new AtomicBoolean(true);
+
+        // Concurrently index new docs with mapping changes
+        Thread thread = new Thread(() -> {
+            char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray();
+            for (char c : chars) {
+                if (isRunning.get() == false) {
+                    break;
+                }
+                final String source;
+                long l = randomLongBetween(0, 50000);
+                if (randomBoolean()) {
+                    source = String.format(Locale.ROOT, "{\"%c\":%d}", c, l);
+                } else {
+                    source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, l);
+                }
+                for (int i = 64; i < 150; i++) {
+                    if (isRunning.get() == false) {
+                        break;
+                    }
+                    leaderClient().prepareIndex("index1", "doc", Long.toString(i)).setSource(source, XContentType.JSON).get();
+                    if (rarely()) {
+                        leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).get();
+                    }
+                }
+                leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();
+            }
+        });
+        thread.start();
+
         Settings.Builder settingsBuilder = Settings.builder()
             .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
             .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
@@ -182,22 +221,18 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
             false, true, settingsBuilder.build(), new String[0],
             "restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]");
 
-        Set<String> sessionsOpened = ConcurrentCollections.newConcurrentSet();
-        Set<String> sessionsClosed = ConcurrentCollections.newConcurrentSet();
-        for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) {
-            restoreSourceService.addOpenSessionListener(sessionsOpened::add);
-            restoreSourceService.addCloseSessionListener(sessionsClosed::add);
-        }
-
         PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
         restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
         RestoreInfo restoreInfo = future.actionGet();
 
-        assertEquals(numberOfPrimaryShards, sessionsOpened.size());
-        assertEquals(numberOfPrimaryShards, sessionsClosed.size());
-
         assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards());
         assertEquals(0, restoreInfo.failedShards());
+        for (int i = 0; i < firstBatchNumDocs; ++i) {
+            assertExpectedDocument(followerIndex, i);
+        }
+
+        isRunning.set(false);
+        thread.join();
     }
 
     public void testFollowerMappingIsUpdated() throws IOException {
@@ -254,6 +289,13 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
         assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));
     }
 
+    private void assertExpectedDocument(String followerIndex, final int value) {
+        final GetResponse getResponse = followerClient().prepareGet(followerIndex, "doc", Integer.toString(value)).get();
+        assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists());
+        assertTrue((getResponse.getSource().containsKey("f")));
+        assertThat(getResponse.getSource().get("f"), equalTo(value));
+    }
+
     private ActionListener<RestoreService.RestoreCompletionResponse> waitForRestore(ClusterService clusterService,
                                                                                     ActionListener<RestoreInfo> listener) {
         return new ActionListener<RestoreService.RestoreCompletionResponse>() {

+ 65 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java

@@ -6,14 +6,20 @@
 
 package org.elasticsearch.xpack.ccr.repository;
 
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.shard.IllegalIndexShardStateException;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTestCase;
+import org.elasticsearch.index.store.StoreFileMetaData;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
 
 public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
@@ -122,4 +128,63 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
         restoreSourceService.closeSession(sessionUUID3);
         closeShards(indexShard1, indexShard2);
     }
+
+    public void testGetSessionReader() throws IOException {
+        IndexShard indexShard1 = newStartedShard(true);
+        final String sessionUUID1 = UUIDs.randomBase64UUID();
+
+        restoreSourceService.openSession(sessionUUID1, indexShard1);
+
+        ArrayList<StoreFileMetaData> files = new ArrayList<>();
+        indexShard1.snapshotStoreMetadata().forEach(files::add);
+
+        StoreFileMetaData fileMetaData = files.get(0);
+        String fileName = fileMetaData.name();
+
+        byte[] expectedBytes = new byte[(int) fileMetaData.length()];
+        byte[] actualBytes = new byte[(int) fileMetaData.length()];
+        Engine.IndexCommitRef indexCommitRef = indexShard1.acquireSafeIndexCommit();
+        try (IndexInput indexInput = indexCommitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE)) {
+            indexInput.seek(0);
+            indexInput.readBytes(expectedBytes, 0, (int) fileMetaData.length());
+        }
+
+        BytesArray byteArray = new BytesArray(actualBytes);
+        try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) {
+            long offset = sessionReader.readFileBytes(fileName, byteArray);
+            assertEquals(offset, fileMetaData.length());
+        }
+
+        assertArrayEquals(expectedBytes, actualBytes);
+        restoreSourceService.closeSession(sessionUUID1);
+        closeShards(indexShard1);
+    }
+
+    public void testGetSessionDoesNotLeakFileIfClosed() throws IOException {
+        Settings settings = Settings.builder().put("index.merge.enabled", false).build();
+        IndexShard indexShard = newStartedShard(true, settings);
+        for (int i = 0; i < 5; i++) {
+            indexDoc(indexShard, "_doc", Integer.toString(i));
+            flushShard(indexShard, true);
+        }
+        final String sessionUUID1 = UUIDs.randomBase64UUID();
+
+        restoreSourceService.openSession(sessionUUID1, indexShard);
+
+        ArrayList<StoreFileMetaData> files = new ArrayList<>();
+        indexShard.snapshotStoreMetadata().forEach(files::add);
+
+        try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) {
+            sessionReader.readFileBytes(files.get(0).name(), new BytesArray(new byte[10]));
+        }
+
+        // Request a second file to ensure that original file is not leaked
+        try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) {
+            sessionReader.readFileBytes(files.get(1).name(), new BytesArray(new byte[10]));
+        }
+
+        restoreSourceService.closeSession(sessionUUID1);
+        closeShards(indexShard);
+        // Exception will be thrown if file is not closed.
+    }
 }