Bläddra i källkod

Avoid redundant BufferedInputStream when reading compressed x-content (#103998)

A large number of the use cases for the decompressor involve reading
x-content from the decompressed stream. Jackson itself does all the
buffering we need here. We can avoid allocations and indirection not
always returning a buffered input stream and instead wrapping a
`StreamInput` on demand, including buffering (this is the only use case
that I could find that requires buffering).
Armin Braun 1 år sedan
förälder
incheckning
df8202206a

+ 1 - 2
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

@@ -26,7 +26,6 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.compress.Compressor;
 import org.elasticsearch.common.compress.CompressorFactory;
 import org.elasticsearch.common.io.Streams;
-import org.elasticsearch.common.io.stream.InputStreamStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.PositionTrackingOutputStreamStreamOutput;
@@ -128,7 +127,7 @@ public class PublicationTransportHandler {
         StreamInput in = request.bytes().streamInput();
         try {
             if (compressor != null) {
-                in = new InputStreamStreamInput(compressor.threadLocalInputStream(in));
+                in = compressor.threadLocalStreamInput(in);
             }
             in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
             in.setTransportVersion(request.version());

+ 1 - 2
server/src/main/java/org/elasticsearch/cluster/coordination/ValidateJoinRequest.java

@@ -13,7 +13,6 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.compress.CompressorFactory;
-import org.elasticsearch.common.io.stream.InputStreamStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -52,7 +51,7 @@ public class ValidateJoinRequest extends TransportRequest {
         try (
             var bytesStreamInput = bytes.streamInput();
             var in = new NamedWriteableAwareStreamInput(
-                new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(bytesStreamInput)),
+                CompressorFactory.COMPRESSOR.threadLocalStreamInput(bytesStreamInput),
                 namedWriteableRegistry
             )
         ) {

+ 11 - 0
server/src/main/java/org/elasticsearch/common/compress/Compressor.java

@@ -9,7 +9,10 @@
 package org.elasticsearch.common.compress;
 
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.InputStreamStreamInput;
+import org.elasticsearch.common.io.stream.StreamInput;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -18,6 +21,14 @@ public interface Compressor {
 
     boolean isCompressed(BytesReference bytes);
 
+    /**
+     * Same as {@link #threadLocalInputStream(InputStream)} but wraps the returned stream as a {@link StreamInput}.
+     */
+    default StreamInput threadLocalStreamInput(InputStream in) throws IOException {
+        // wrap stream in buffer since InputStreamStreamInput doesn't do any buffering itself but does a lot of small reads
+        return new InputStreamStreamInput(new BufferedInputStream(threadLocalInputStream(in), DeflateCompressor.BUFFER_SIZE));
+    }
+
     /**
      * Creates a new input stream that decompresses the contents read from the provided input stream.
      * Closing the returned {@link InputStream} will close the provided stream input.

+ 11 - 6
server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java

@@ -14,7 +14,6 @@ import org.elasticsearch.core.Assertions;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Streams;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -150,18 +149,24 @@ public class DeflateCompressor implements Compressor {
             inflater = new Inflater(true);
             releasable = inflater::end;
         }
-        return new BufferedInputStream(new InflaterInputStream(in, inflater, BUFFER_SIZE) {
+        return new InflaterInputStream(in, inflater, BUFFER_SIZE) {
+
+            private Releasable release = releasable;
+
             @Override
             public void close() throws IOException {
+                if (release == null) {
+                    return;
+                }
                 try {
                     super.close();
                 } finally {
-                    // We are ensured to only call this once since we wrap this stream in a BufferedInputStream that will only close
-                    // its delegate once
-                    releasable.close();
+                    // We need to ensure that we only call this once
+                    release.close();
+                    release = null;
                 }
             }
-        }, BUFFER_SIZE);
+        };
     }
 
     @Override

+ 2 - 6
server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java

@@ -120,13 +120,9 @@ public class XContentHelper {
         Objects.requireNonNull(xContentType);
         Compressor compressor = CompressorFactory.compressor(bytes);
         if (compressor != null) {
-            InputStream compressedInput = compressor.threadLocalInputStream(bytes.streamInput());
-            if (compressedInput.markSupported() == false) {
-                compressedInput = new BufferedInputStream(compressedInput);
-            }
-            return XContentFactory.xContent(xContentType).createParser(config, compressedInput);
+            return XContentFactory.xContent(xContentType).createParser(config, compressor.threadLocalInputStream(bytes.streamInput()));
         } else {
-            // TODO now that we have config we make a method on bytes to do this building wihout needing this check everywhere
+            // TODO now that we have config we make a method on bytes to do this building without needing this check everywhere
             return createParserNotCompressed(config, bytes, xContentType);
         }
     }

+ 1 - 2
server/src/main/java/org/elasticsearch/transport/TransportLogger.java

@@ -13,7 +13,6 @@ import org.elasticsearch.TransportVersion;
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.compress.CompressorFactory;
-import org.elasticsearch.common.io.stream.InputStreamStreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.IOUtils;
@@ -159,7 +158,7 @@ public final class TransportLogger {
     private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException {
         if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
             try {
-                return new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(streamInput));
+                return CompressorFactory.COMPRESSOR.threadLocalStreamInput(streamInput);
             } catch (IllegalArgumentException e) {
                 throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
             }

+ 1 - 2
server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java

@@ -27,7 +27,6 @@ import org.elasticsearch.cluster.service.BatchSummary;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.compress.Compressor;
 import org.elasticsearch.common.compress.CompressorFactory;
-import org.elasticsearch.common.io.stream.InputStreamStreamInput;
 import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -147,7 +146,7 @@ public class PublicationTransportHandlerTests extends ESTestCase {
                 in = request.bytes().streamInput();
                 final Compressor compressor = CompressorFactory.compressor(request.bytes());
                 if (compressor != null) {
-                    in = new InputStreamStreamInput(compressor.threadLocalInputStream(in));
+                    in = compressor.threadLocalStreamInput(in);
                 }
                 in.setTransportVersion(version);
                 return in.readBoolean() == false;

+ 5 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

@@ -582,10 +582,13 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
         });
         TransportVersion version = TransportVersion.readVersion(new InputStreamStreamInput(encodedIn));
         assert version.onOrBefore(TransportVersion.current()) : version + " >= " + TransportVersion.current();
+        final StreamInput input;
         if (version.onOrAfter(TransportVersions.V_7_15_0)) {
-            encodedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(encodedIn);
+            input = CompressorFactory.COMPRESSOR.threadLocalStreamInput(encodedIn);
+        } else {
+            input = new InputStreamStreamInput(encodedIn);
         }
-        try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(encodedIn), registry)) {
+        try (StreamInput in = new NamedWriteableAwareStreamInput(input, registry)) {
             in.setTransportVersion(version);
             return reader.read(in);
         }

+ 6 - 4
x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java

@@ -17,7 +17,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.compress.CompressorFactory;
 import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
-import org.elasticsearch.common.io.stream.InputStreamStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -41,7 +40,6 @@ import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.junit.After;
 
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Base64;
@@ -312,8 +310,12 @@ public class AsyncEqlSearchActionIT extends AbstractEqlBlockingIntegTestCase {
                 String value = doc.getSource().get("result").toString();
                 try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
                     TransportVersion version = TransportVersion.readVersion(buf);
-                    final InputStream compressedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(buf);
-                    try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(compressedIn), registry)) {
+                    try (
+                        StreamInput in = new NamedWriteableAwareStreamInput(
+                            CompressorFactory.COMPRESSOR.threadLocalStreamInput(buf),
+                            registry
+                        )
+                    ) {
                         in.setTransportVersion(version);
                         return new StoredAsyncResponse<>(EqlSearchResponse::new, in);
                     }

+ 6 - 3
x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AsyncSqlSearchActionIT.java

@@ -38,7 +38,6 @@ import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
 import org.elasticsearch.xpack.sql.plugin.SqlAsyncGetResultsAction;
 import org.junit.After;
 
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Base64;
@@ -301,8 +300,12 @@ public class AsyncSqlSearchActionIT extends AbstractSqlBlockingIntegTestCase {
                 String value = doc.getSource().get("result").toString();
                 try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
                     TransportVersion version = TransportVersion.readVersion(buf);
-                    final InputStream compressedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(buf);
-                    try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(compressedIn), registry)) {
+                    try (
+                        StreamInput in = new NamedWriteableAwareStreamInput(
+                            new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalStreamInput(buf)),
+                            registry
+                        )
+                    ) {
                         in.setTransportVersion(version);
                         return new StoredAsyncResponse<>(SqlQueryResponse::new, in);
                     }

+ 1 - 3
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/common/io/SqlStreamInput.java

@@ -10,7 +10,6 @@ package org.elasticsearch.xpack.sql.common.io;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.compress.CompressorFactory;
-import org.elasticsearch.common.io.stream.InputStreamStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -32,8 +31,7 @@ public class SqlStreamInput extends NamedWriteableAwareStreamInput {
         StreamInput in = StreamInput.wrap(bytes);
         TransportVersion inVersion = TransportVersion.readVersion(in);
         validateStreamVersion(version, inVersion);
-        InputStreamStreamInput uncompressingIn = new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(in));
-        return new SqlStreamInput(uncompressingIn, namedWriteableRegistry, inVersion);
+        return new SqlStreamInput(CompressorFactory.COMPRESSOR.threadLocalStreamInput(in), namedWriteableRegistry, inVersion);
     }
 
     /**