1
0
Эх сурвалжийг харах

Force implementing bulk InputStream#read on StreamInput (#112072)

We should enforce overriding here to avoid extremely slow byte-by-byte
reads when using these instances as `InputStream`.
I only found one case where this matters practically in the codebase
but it's probably good to guard against it.
Armin Braun 1 жил өмнө
parent
commit
fe786b7b8f

+ 7 - 0
server/src/main/java/org/elasticsearch/common/io/stream/ByteArrayStreamInput.java

@@ -117,4 +117,11 @@ public final class ByteArrayStreamInput extends StreamInput {
         System.arraycopy(bytes, pos, b, offset, len);
         pos += len;
     }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        int toRead = Math.min(len, available());
+        readBytes(b, off, toRead);
+        return toRead;
+    }
 }

+ 5 - 0
server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java

@@ -97,6 +97,11 @@ public abstract class FilterStreamInput extends StreamInput {
         return delegate.read();
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        return delegate.read(b, off, len);
+    }
+
     @Override
     public void close() throws IOException {
         delegate.close();

+ 4 - 0
server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

@@ -104,6 +104,10 @@ public abstract class StreamInput extends InputStream {
      */
     public abstract void readBytes(byte[] b, int offset, int len) throws IOException;
 
+    // force implementing bulk reads to avoid accidentally slow implementations
+    @Override
+    public abstract int read(byte[] b, int off, int len) throws IOException;
+
     /**
      * Reads a bytes reference from this stream, copying any bytes read to a new {@code byte[]}. Use {@link #readReleasableBytesReference()}
      * when reading large bytes references where possible top avoid needless allocations and copying.

+ 9 - 0
server/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamInput.java

@@ -66,6 +66,15 @@ public final class BufferedChecksumStreamInput extends FilterStreamInput {
         digest.update(b, offset, len);
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        int read = delegate.read(b, off, len);
+        if (read > 0) {
+            digest.update(b, off, read);
+        }
+        return read;
+    }
+
     private static final ThreadLocal<byte[]> buffer = ThreadLocal.withInitial(() -> new byte[8]);
 
     @Override

+ 9 - 0
server/src/test/java/org/elasticsearch/common/io/stream/StreamInputTests.java

@@ -24,6 +24,15 @@ import static org.mockito.Mockito.verify;
 public class StreamInputTests extends ESTestCase {
 
     private StreamInput in = Mockito.spy(StreamInput.class);
+
+    {
+        try {
+            Mockito.when(in.skip(anyLong())).thenAnswer(a -> a.getArguments()[0]);
+        } catch (IOException e) {
+            throw new AssertionError(e);
+        }
+    }
+
     byte[] bytes = "0123456789".getBytes(UTF_8);
 
     public void testCalculateByteLengthOfAscii() throws IOException {

+ 5 - 0
x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sample/CircuitBreakerTests.java

@@ -260,6 +260,11 @@ public class CircuitBreakerTests extends ESTestCase {
 
                     }
 
+                    @Override
+                    public int read(byte[] b, int off, int len) throws IOException {
+                        return 0;
+                    }
+
                     @Override
                     public void close() throws IOException {