瀏覽代碼

Specialize skip for InputStreamIndexInput (#118436) (#118477)

Skip would previously defer to the default implementation that
reads bytes unnecessarily and may be slow. We now specialize it
so that it seeks quickly.

Closes ES-10234
Iraklis Psaroudakis 10 月之前
父節點
當前提交
d553989460

+ 11 - 0
server/src/main/java/org/elasticsearch/common/lucene/store/InputStreamIndexInput.java

@@ -88,4 +88,15 @@ public class InputStreamIndexInput extends InputStream {
         indexInput.seek(markPointer);
         indexInput.seek(markPointer);
         counter = markCounter;
         counter = markCounter;
     }
     }
+
+    @Override
+    public long skip(long n) throws IOException {
+        long skipBytes = Math.min(n, Math.min(indexInput.length() - indexInput.getFilePointer(), limit - counter));
+        if (skipBytes <= 0) {
+            return 0;
+        }
+        indexInput.skipBytes(skipBytes);
+        counter += skipBytes;
+        return skipBytes;
+    }
 }
 }

+ 36 - 1
server/src/test/java/org/elasticsearch/common/lucene/store/InputStreamIndexInputTests.java

@@ -218,7 +218,7 @@ public class InputStreamIndexInputTests extends ESTestCase {
         assertThat(is.read(read), equalTo(-1));
         assertThat(is.read(read), equalTo(-1));
     }
     }
 
 
-    public void testMarkRest() throws Exception {
+    public void testMarkReset() throws Exception {
         Directory dir = new ByteBuffersDirectory();
         Directory dir = new ByteBuffersDirectory();
         IndexOutput output = dir.createOutput("test", IOContext.DEFAULT);
         IndexOutput output = dir.createOutput("test", IOContext.DEFAULT);
         for (int i = 0; i < 3; i++) {
         for (int i = 0; i < 3; i++) {
@@ -243,6 +243,41 @@ public class InputStreamIndexInputTests extends ESTestCase {
         assertThat(is.read(), equalTo(2));
         assertThat(is.read(), equalTo(2));
     }
     }
 
 
+    public void testSkipBytes() throws Exception {
+        Directory dir = new ByteBuffersDirectory();
+        IndexOutput output = dir.createOutput("test", IOContext.DEFAULT);
+        int bytes = randomIntBetween(10, 100);
+        for (int i = 0; i < bytes; i++) {
+            output.writeByte((byte) i);
+        }
+        output.close();
+
+        int limit = randomIntBetween(0, bytes * 2);
+        int initialReadBytes = randomIntBetween(0, limit);
+        int skipBytes = randomIntBetween(0, limit);
+        int seekExpected = Math.min(Math.min(initialReadBytes + skipBytes, limit), bytes);
+        int skipBytesExpected = Math.max(seekExpected - initialReadBytes, 0);
+        logger.debug(
+            "bytes: {}, limit: {}, initialReadBytes: {}, skipBytes: {}, seekExpected: {}, skipBytesExpected: {}",
+            bytes,
+            limit,
+            initialReadBytes,
+            skipBytes,
+            seekExpected,
+            skipBytesExpected
+        );
+
+        IndexInput input = dir.openInput("test", IOContext.DEFAULT);
+        InputStreamIndexInput is = new InputStreamIndexInput(input, limit);
+        is.readNBytes(initialReadBytes);
+        assertThat(is.skip(skipBytes), equalTo((long) skipBytesExpected));
+
+        int remainingBytes = Math.min(bytes, limit) - seekExpected;
+        for (int i = seekExpected; i < seekExpected + remainingBytes; i++) {
+            assertThat(is.read(), equalTo(i));
+        }
+    }
+
     public void testReadZeroShouldReturnZero() throws IOException {
     public void testReadZeroShouldReturnZero() throws IOException {
         try (Directory dir = new ByteBuffersDirectory()) {
         try (Directory dir = new ByteBuffersDirectory()) {
             try (IndexOutput output = dir.createOutput("test", IOContext.DEFAULT)) {
             try (IndexOutput output = dir.createOutput("test", IOContext.DEFAULT)) {