Ver Fonte

Remove IndexInputStream Indirection from sendFiles Step (#72134)

No need for the stream wrapper here. It just creates a situation
where if the construction of the stream fails for whatever reason,
we leak the index input.
Also, no point in nulling out the input field in the close step, it'll
get GCed anyway and we don't use `input == null` as a condition anywhere.
Armin Braun há 4 anos atrás
pai
commit
4dc978e840

+ 9 - 18
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -37,7 +37,6 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.CancellableThreads;
@@ -896,20 +895,14 @@ public class RecoverySourceHandler {
             new MultiChunkTransfer<>(logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) {
 
                 final Deque<byte[]> buffers = new ConcurrentLinkedDeque<>();
-                InputStreamIndexInput currentInput = null;
+                IndexInput currentInput = null;
                 long offset = 0;
 
                 @Override
                 protected void onNewResource(StoreFileMetadata md) throws IOException {
                     offset = 0;
-                    IOUtils.close(currentInput, () -> currentInput = null);
-                    final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
-                    currentInput = new InputStreamIndexInput(indexInput, md.length()) {
-                        @Override
-                        public void close() throws IOException {
-                            IOUtils.close(indexInput, super::close); // InputStreamIndexInput's close is a noop
-                        }
-                    };
+                    IOUtils.close(currentInput);
+                    currentInput = store.directory().openInput(md.name(), IOContext.READONCE);
                 }
 
                 @Override
@@ -917,14 +910,12 @@ public class RecoverySourceHandler {
                     assert Transports.assertNotTransportThread("read file chunk");
                     cancellableThreads.checkForCancel();
                     final byte[] buffer = Objects.requireNonNullElseGet(buffers.pollFirst(), () -> new byte[chunkSizeInBytes]);
-                    final int bytesRead = currentInput.read(buffer);
-                    if (bytesRead == -1) {
-                        throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name());
-                    }
-                    final boolean lastChunk = offset + bytesRead == md.length();
-                    final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), offset, lastChunk,
+                    final int toRead = Math.toIntExact(Math.min(md.length() - offset, buffer.length));
+                    currentInput.readBytes(buffer, 0, toRead, false);
+                    final boolean lastChunk = offset + toRead == md.length();
+                    final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, toRead), offset, lastChunk,
                         () -> buffers.addFirst(buffer));
-                    offset += bytesRead;
+                    offset += toRead;
                     return chunk;
                 }
 
@@ -944,7 +935,7 @@ public class RecoverySourceHandler {
 
                 @Override
                 public void close() throws IOException {
-                    IOUtils.close(currentInput, () -> currentInput = null);
+                    IOUtils.close(currentInput);
                 }
             };
         resources.add(multiFileSender);