Browse Source

Make writerWithOffset fully delegate to the writer it wraps (#126937)

writerWithOffset uses a lambda to create a RangeMissingHandler however,
the RangeMissingHandler interface has a default implementation for `sharedInputStreamFactory`.

This makes `writerWithOffset` delegate to the received writer only for the `fillCacheRange`
method where the writer itself perhaps didn't have the `sharedInputStream` method invoked
(always invoking `sharedInputStream` before `fillCacheRange` is part of the contract of the
RangeMissingHandler interface)

This PR makes `writerWithOffset` delegate the `sharedInputStream` to the underlying writer.
Andrei Dan 4 months ago
parent
commit
2375e89a5f

+ 42 - 9
x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

@@ -660,20 +660,53 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         }
     }
 
+    /**
+     * Wraps the given {@link RangeMissingHandler} to adjust the position of the data read from the blob store
+     * (NB: the relativePos parameter in
+     * {@link RangeMissingHandler#fillCacheRange(SharedBytes.IO, int, SourceInputStreamFactory, int, int, IntConsumer, ActionListener)})
+     * relative to the beginning of the region we're reading from.
+     *
+     * This is useful so that we can read the input stream we open for reading from the blob store
+     * from the beginning (i.e. position 0 <b>in the input stream</b>).
+     *
+     * For example, if we want to read 2000 bytes the blob store starting at position 1000, the writer here will
+     * adjust the relative position we read to be 0, the offset being 1000, and the input stream we open to
+     * read from the blob store will start streaming from position 1000 (but we adjusted the relative read position
+     * to 0 so we consume the input stream from the beginning).
+     */
     private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, int writeOffset) {
         if (writeOffset == 0) {
             // no need to allocate a new capturing lambda if the offset isn't adjusted
             return writer;
         }
-        return (channel, channelPos, streamFactory, relativePos, len, progressUpdater, completionListener) -> writer.fillCacheRange(
-            channel,
-            channelPos,
-            streamFactory,
-            relativePos - writeOffset,
-            len,
-            progressUpdater,
-            completionListener
-        );
+
+        return new RangeMissingHandler() {
+            @Override
+            public void fillCacheRange(
+                SharedBytes.IO channel,
+                int channelPos,
+                SourceInputStreamFactory streamFactory,
+                int relativePos,
+                int length,
+                IntConsumer progressUpdater,
+                ActionListener<Void> completionListener
+            ) throws IOException {
+                writer.fillCacheRange(
+                    channel,
+                    channelPos,
+                    streamFactory,
+                    relativePos - writeOffset,
+                    length,
+                    progressUpdater,
+                    completionListener
+                );
+            }
+
+            @Override
+            public SourceInputStreamFactory sharedInputStreamFactory(List<SparseFileTracker.Gap> gaps) {
+                return writer.sharedInputStreamFactory(gaps);
+            }
+        };
     }
 
     // used by tests