|
@@ -890,56 +890,62 @@ public class RecoverySourceHandler {
|
|
|
|
|
|
void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener<Void> listener) {
|
|
|
ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length)); // send smallest first
|
|
|
+ Releasable temporaryStoreRef = acquireStore(store);
|
|
|
+ try {
|
|
|
+ final Releasable storeRef = temporaryStoreRef;
|
|
|
+ final MultiChunkTransfer<StoreFileMetadata, FileChunk> multiFileSender =
|
|
|
+ new MultiChunkTransfer<>(logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) {
|
|
|
+
|
|
|
+ final Deque<byte[]> buffers = new ConcurrentLinkedDeque<>();
|
|
|
+ IndexInput currentInput = null;
|
|
|
+ long offset = 0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void onNewResource(StoreFileMetadata md) throws IOException {
|
|
|
+ offset = 0;
|
|
|
+ IOUtils.close(currentInput);
|
|
|
+ currentInput = store.directory().openInput(md.name(), IOContext.READONCE);
|
|
|
+ }
|
|
|
|
|
|
- final MultiChunkTransfer<StoreFileMetadata, FileChunk> multiFileSender =
|
|
|
- new MultiChunkTransfer<>(logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) {
|
|
|
-
|
|
|
- final Deque<byte[]> buffers = new ConcurrentLinkedDeque<>();
|
|
|
- IndexInput currentInput = null;
|
|
|
- long offset = 0;
|
|
|
-
|
|
|
- @Override
|
|
|
- protected void onNewResource(StoreFileMetadata md) throws IOException {
|
|
|
- offset = 0;
|
|
|
- IOUtils.close(currentInput);
|
|
|
- currentInput = store.directory().openInput(md.name(), IOContext.READONCE);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException {
|
|
|
- assert Transports.assertNotTransportThread("read file chunk");
|
|
|
- cancellableThreads.checkForCancel();
|
|
|
- final byte[] buffer = Objects.requireNonNullElseGet(buffers.pollFirst(), () -> new byte[chunkSizeInBytes]);
|
|
|
- final int toRead = Math.toIntExact(Math.min(md.length() - offset, buffer.length));
|
|
|
- currentInput.readBytes(buffer, 0, toRead, false);
|
|
|
- final boolean lastChunk = offset + toRead == md.length();
|
|
|
- final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, toRead), offset, lastChunk,
|
|
|
- () -> buffers.addFirst(buffer));
|
|
|
- offset += toRead;
|
|
|
- return chunk;
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException {
|
|
|
+ assert Transports.assertNotTransportThread("read file chunk");
|
|
|
+ cancellableThreads.checkForCancel();
|
|
|
+ final byte[] buffer = Objects.requireNonNullElseGet(buffers.pollFirst(), () -> new byte[chunkSizeInBytes]);
|
|
|
+ final int toRead = Math.toIntExact(Math.min(md.length() - offset, buffer.length));
|
|
|
+ currentInput.readBytes(buffer, 0, toRead, false);
|
|
|
+ final boolean lastChunk = offset + toRead == md.length();
|
|
|
+ final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, toRead), offset, lastChunk,
|
|
|
+ () -> buffers.addFirst(buffer));
|
|
|
+ offset += toRead;
|
|
|
+ return chunk;
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
|
|
|
- cancellableThreads.checkForCancel();
|
|
|
- final ReleasableBytesReference content = new ReleasableBytesReference(request.content, request);
|
|
|
- recoveryTarget.writeFileChunk(
|
|
|
- request.md, request.position, content, request.lastChunk,
|
|
|
- translogOps.getAsInt(), ActionListener.runBefore(listener, content::close));
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
|
|
|
+ cancellableThreads.checkForCancel();
|
|
|
+ final ReleasableBytesReference content = new ReleasableBytesReference(request.content, request);
|
|
|
+ recoveryTarget.writeFileChunk(
|
|
|
+ request.md, request.position, content, request.lastChunk,
|
|
|
+ translogOps.getAsInt(), ActionListener.runBefore(listener, content::close));
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- protected void handleError(StoreFileMetadata md, Exception e) throws Exception {
|
|
|
- handleErrorOnSendFiles(store, e, new StoreFileMetadata[]{md});
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ protected void handleError(StoreFileMetadata md, Exception e) throws Exception {
|
|
|
+ handleErrorOnSendFiles(store, e, new StoreFileMetadata[]{md});
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void close() throws IOException {
|
|
|
- IOUtils.close(currentInput);
|
|
|
- }
|
|
|
- };
|
|
|
- resources.add(multiFileSender);
|
|
|
- multiFileSender.start();
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ IOUtils.close(currentInput, storeRef);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ resources.add(multiFileSender);
|
|
|
+ temporaryStoreRef = null; // now owned by multiFileSender, tracked in resources, so won't be leaked
|
|
|
+ multiFileSender.start();
|
|
|
+ } finally {
|
|
|
+ Releasables.close(temporaryStoreRef);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void cleanFiles(Store store, Store.MetadataSnapshot sourceMetadata, IntSupplier translogOps,
|