浏览代码

Reuse FrozenIndexInput.writeCacheFile method (#70545)

Makes it easier for subsequent PRs that change the core write logic of FrozenIndexInput as there is just a single write path
Yannick Welsch 4 年之前
父节点
当前提交
a99f3b52e4

+ 35 - 49
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java

@@ -229,30 +229,14 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
                             (channel, channelPos, relativePos, len, progressUpdater) -> {
                                 assert len <= cachedBlob.to() - cachedBlob.from();
                                 final long startTimeNanos = stats.currentTimeNanos();
-                                final BytesRefIterator iterator = cachedBlob.bytes()
-                                    .slice(toIntBytes(relativePos), toIntBytes(len))
-                                    .iterator();
-                                long writePosition = channelPos;
-                                long bytesCopied = 0L;
-                                BytesRef current;
-                                while ((current = iterator.next()) != null) {
-                                    final ByteBuffer byteBuffer = ByteBuffer.wrap(current.bytes, current.offset, current.length);
-                                    while (byteBuffer.remaining() > 0) {
-                                        final long bytesWritten = positionalWrite(channel, writePosition, byteBuffer);
-                                        bytesCopied += bytesWritten;
-                                        writePosition += bytesWritten;
-                                        progressUpdater.accept(bytesCopied);
-                                    }
-                                }
-                                long channelTo = channelPos + len;
-                                assert writePosition == channelTo : writePosition + " vs " + channelTo;
-                                final long endTimeNanos = stats.currentTimeNanos();
-                                stats.addCachedBytesWritten(len, endTimeNanos - startTimeNanos);
-                                logger.trace(
-                                    "copied bytes [{}-{}] of file [{}] from cache index to disk",
+                                writeCacheFile(
+                                    channel,
+                                    cachedBlob.bytes().streamInput(),
+                                    channelPos,
                                     relativePos,
-                                    relativePos + len,
-                                    fileName
+                                    len,
+                                    progressUpdater,
+                                    startTimeNanos
                                 );
                             },
                             directory.cacheFetchAsyncExecutor()
@@ -309,14 +293,14 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
                     luceneByteBufLock,
                     stopAsyncReads
                 ),
-                (channel, channelPos, relativePos, len, progressUpdater) -> this.writeCacheFile(
-                    channel,
-                    channelPos,
-                    relativePos,
-                    len,
-                    rangeToWrite.start(),
-                    progressUpdater
-                ),
+                (channel, channelPos, relativePos, len, progressUpdater) -> {
+                    final long startTimeNanos = stats.currentTimeNanos();
+                    final long streamStartPosition = rangeToWrite.start() + relativePos + compoundFileOffset;
+
+                    try (InputStream input = openInputStreamFromBlobStore(streamStartPosition, len)) {
+                        this.writeCacheFile(channel, input, channelPos, relativePos, len, progressUpdater, startTimeNanos);
+                    }
+                },
                 directory.cacheFetchAsyncExecutor()
             );
 
@@ -589,17 +573,17 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
 
     private void writeCacheFile(
         final SharedBytes.IO fc,
-        long fileChannelPos,
-        long relativePos,
-        long length,
-        long logicalPos,
-        final Consumer<Long> progressUpdater
+        final InputStream input,
+        final long fileChannelPos,
+        final long relativePos,
+        final long length,
+        final Consumer<Long> progressUpdater,
+        final long startTimeNanos
     ) throws IOException {
         assert assertCurrentThreadMayWriteCacheFile();
         logger.trace(
-            "{}: writing logical {} channel {} pos {} length {} (details: {})",
+            "{}: writing channel {} pos {} length {} (details: {})",
             fileInfo.physicalName(),
-            logicalPos,
             fileChannelPos,
             relativePos,
             length,
@@ -611,19 +595,21 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
 
         long bytesCopied = 0L;
         long remaining = length;
-        final long startTimeNanos = stats.currentTimeNanos();
-        try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos + compoundFileOffset, length)) {
-            while (remaining > 0L) {
-                final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile);
-                positionalWrite(fc, fileChannelPos + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead));
-                bytesCopied += bytesRead;
-                remaining -= bytesRead;
-                progressUpdater.accept(bytesCopied);
+        while (remaining > 0L) {
+            final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile);
+            final ByteBuffer byteBuffer = ByteBuffer.wrap(copyBuffer, 0, bytesRead);
+            int writePosition = 0;
+            while (byteBuffer.remaining() > 0) {
+                final long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied + writePosition, byteBuffer);
+                writePosition += bytesWritten;
             }
-            final long endTimeNanos = stats.currentTimeNanos();
-            assert bytesCopied == length;
-            stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos);
+            bytesCopied += bytesRead;
+            remaining -= bytesRead;
+            progressUpdater.accept(bytesCopied);
         }
+        final long endTimeNanos = stats.currentTimeNanos();
+        assert bytesCopied == length;
+        stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos);
     }
 
     @Override