Browse Source

Manually Manage Direct Write Buffer in Frozen Cache Writes (#70399)

Manually managing the buffer has a number of upsides for us and no direct downsides
as far as I can see.
* We limit the number of syscalls for large writes by a factor of up to `8` (in practice it's probably mostly `8` when writing larger amounts of data)
* Since we limit the write thread count to 28 the amount of direct memory used is limited
* We eliminate the copying from heap -> off-heap under the lock in the channel's positional write
* We eliminate allocating `ByteBuffer` instances in a hot loop when writing large regions
Armin Braun 4 years ago
parent
commit
6c085741a3

+ 22 - 8
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java

@@ -467,7 +467,11 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
 
     private static int positionalWrite(SharedBytes.IO fc, long start, ByteBuffer byteBuffer) throws IOException {
         assert assertCurrentThreadMayWriteCacheFile();
-        return fc.write(byteBuffer, start);
+        byteBuffer.flip();
+        int written = fc.write(byteBuffer, start);
+        assert byteBuffer.hasRemaining() == false;
+        byteBuffer.clear();
+        return written;
     }
 
     /**
@@ -571,6 +575,13 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
         return bytesRead;
     }
 
+    /**
+     * Thread local direct byte buffer to aggregate multiple positional writes to the cache file.
+     */
+    private static final ThreadLocal<ByteBuffer> writeBuffer = ThreadLocal.withInitial(
+        () -> ByteBuffer.allocateDirect(COPY_BUFFER_SIZE * 8)
+    );
+
     private void writeCacheFile(
         final SharedBytes.IO fc,
         final InputStream input,
@@ -595,18 +606,21 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
 
         long bytesCopied = 0L;
         long remaining = length;
+        final ByteBuffer buf = writeBuffer.get();
+        buf.clear();
         while (remaining > 0L) {
             final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile);
-            final ByteBuffer byteBuffer = ByteBuffer.wrap(copyBuffer, 0, bytesRead);
-            int writePosition = 0;
-            while (byteBuffer.remaining() > 0) {
-                final long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied + writePosition, byteBuffer);
-                writePosition += bytesWritten;
+            if (bytesRead > buf.remaining()) {
+                long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied, buf);
+                bytesCopied += bytesWritten;
+                progressUpdater.accept(bytesCopied);
             }
-            bytesCopied += bytesRead;
+            buf.put(copyBuffer, 0, bytesRead);
             remaining -= bytesRead;
-            progressUpdater.accept(bytesCopied);
         }
+        long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied, buf);
+        bytesCopied += bytesWritten;
+        progressUpdater.accept(bytesCopied);
         final long endTimeNanos = stats.currentTimeNanos();
         assert bytesCopied == length;
         stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos);