Bladeren bron

Speed up Compression Logic by Pooling Resources (#61358)

This is mostly motivated by the performance issues we are seeing around the GET mappings
REST API which (in case of a large number of indices) will create decompressing streams in a hot loop
which takes a significant amount of time for the system calls involved in instantiating deflaters
and inflaters.
Also, this fixes a leaked deflater when deserializing cached repository data.
Armin Braun 5 jaren geleden
bovenliggende
commit
c531574407

+ 11 - 6
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

@@ -136,15 +136,16 @@ public class PublicationTransportHandler {
         StreamInput in = request.bytes().streamInput();
         try {
             if (compressor != null) {
-                in = compressor.streamInput(in);
+                in = compressor.threadLocalStreamInput(in);
             }
             in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
             in.setVersion(request.version());
             // If true we received full cluster state - otherwise diffs
             if (in.readBoolean()) {
                 final ClusterState incomingState;
-                try {
-                    incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
+                // Close early to release resources used by the de-compression as early as possible
+                try (StreamInput input = in) {
+                    incomingState = ClusterState.readFrom(input, transportService.getLocalNode());
                 } catch (Exception e){
                     logger.warn("unexpected error while deserializing an incoming cluster state", e);
                     throw e;
@@ -164,7 +165,11 @@ public class PublicationTransportHandler {
                 } else {
                     ClusterState incomingState;
                     try {
-                        Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode());
+                        final Diff<ClusterState> diff;
+                        // Close stream early to release resources used by the de-compression as early as possible
+                        try (StreamInput input = in) {
+                            diff = ClusterState.readDiffFrom(input, lastSeen.nodes().getLocalNode());
+                        }
                         incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
                     } catch (IncompatibleClusterStateVersionException e) {
                         incompatibleClusterStateDiffReceivedCount.incrementAndGet();
@@ -211,7 +216,7 @@ public class PublicationTransportHandler {
 
     private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
         final BytesStreamOutput bStream = new BytesStreamOutput();
-        try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
+        try (StreamOutput stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(bStream)) {
             stream.setVersion(nodeVersion);
             stream.writeBoolean(true);
             clusterState.writeTo(stream);
@@ -224,7 +229,7 @@ public class PublicationTransportHandler {
 
     private static BytesReference serializeDiffClusterState(Diff<ClusterState> diff, Version nodeVersion) throws IOException {
         final BytesStreamOutput bStream = new BytesStreamOutput();
-        try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
+        try (StreamOutput stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(bStream)) {
             stream.setVersion(nodeVersion);
             stream.writeBoolean(false);
             diff.writeTo(stream);

+ 1 - 1
server/src/main/java/org/elasticsearch/common/compress/CompressedXContent.java

@@ -71,7 +71,7 @@ public final class CompressedXContent {
      */
     public CompressedXContent(ToXContent xcontent, XContentType type, ToXContent.Params params) throws IOException {
         BytesStreamOutput bStream = new BytesStreamOutput();
-        OutputStream compressedStream = CompressorFactory.COMPRESSOR.streamOutput(bStream);
+        OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(bStream);
         CRC32 crc32 = new CRC32();
         OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32);
         try (XContentBuilder builder = XContentFactory.contentBuilder(type, checkedStream)) {

+ 10 - 2
server/src/main/java/org/elasticsearch/common/compress/Compressor.java

@@ -32,13 +32,21 @@ public interface Compressor {
 
     int headerLength();
 
-    StreamInput streamInput(StreamInput in) throws IOException;
+    /**
+     * Creates a new stream input that decompresses the contents read from the provided stream input.
+     * Closing the returned {@link StreamInput} will close the provided stream input.
+     * Note: The returned stream may only be used on the thread that created it as it might use thread-local resources and must be safely
+     * closed after use
+     */
+    StreamInput threadLocalStreamInput(StreamInput in) throws IOException;
 
     /**
      * Creates a new stream output that compresses the contents and writes to the provided stream
      * output. Closing the returned {@link StreamOutput} will close the provided stream output.
+     * Note: The returned stream may only be used on the thread that created it as it might use thread-local resources and must be safely
+     * closed after use
      */
-    StreamOutput streamOutput(OutputStream out) throws IOException;
+    StreamOutput threadLocalStreamOutput(OutputStream out) throws IOException;
 
     /**
      * Decompress bytes into a newly allocated buffer.

+ 116 - 32
server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java

@@ -19,12 +19,14 @@
 
 package org.elasticsearch.common.compress;
 
+import org.elasticsearch.Assertions;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.InputStreamStreamInput;
 import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.lease.Releasable;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -32,7 +34,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.Inflater;
@@ -73,8 +74,75 @@ public class DeflateCompressor implements Compressor {
         return HEADER.length;
     }
 
+    // Reusable inflater reference for streaming decompression
+    private static final ThreadLocal<ReleasableReference<Inflater>> inflaterForStreamRef = ThreadLocal.withInitial(() -> {
+        final Inflater inflater = new Inflater(true);
+        return new ReleasableReference<>(inflater, inflater::reset);
+    });
+
+    // Reusable deflater reference for streaming compression
+    private static final ThreadLocal<ReleasableReference<Deflater>> deflaterForStreamRef = ThreadLocal.withInitial(() -> {
+        final Deflater deflater = new Deflater(LEVEL, true);
+        return new ReleasableReference<>(deflater, deflater::reset);
+    });
+
+    // Reference to a deflater or inflater that is used to make sure we do not use the same stream twice when nesting streams.
+    private static final class ReleasableReference<T> implements Releasable {
+
+        protected final T resource;
+
+        private final Releasable releasable;
+
+        // Thread that is currently using this reference. Only used for assertions and only assigned if assertions are enabled.
+        private Thread thread = null;
+
+        // true if this reference is currently in use and is not available for re-use
+        boolean inUse;
+
+        protected ReleasableReference(T resource, Releasable releasable) {
+            this.resource = resource;
+            this.releasable = releasable;
+        }
+
+        T get() {
+            if (Assertions.ENABLED) {
+                assert thread == null;
+                thread = Thread.currentThread();
+            }
+            assert inUse == false;
+            inUse = true;
+            return resource;
+        }
+
+        @Override
+        public void close() {
+            if (Assertions.ENABLED) {
+                assert thread == Thread.currentThread() :
+                        "Opened on [" + thread.getName() + "] but closed on [" + Thread.currentThread().getName() + "]";
+                thread = null;
+            }
+            assert inUse;
+            inUse = false;
+            releasable.close();
+        }
+    }
+
     @Override
-    public StreamInput streamInput(StreamInput in) throws IOException {
+    public StreamInput threadLocalStreamInput(StreamInput in) throws IOException {
+        return new InputStreamStreamInput(inputStream(in, true));
+    }
+
+    /**
+     * Creates a new input stream that decompresses the contents read from the provided input stream.
+     * Closing the returned stream will close the provided input stream.
+     * Optionally uses thread-local, pooled resources to save off-heap allocations if the stream is guaranteed to not escape the current
+     * thread.
+     *
+     * @param in           input stream to wrap
+     * @param threadLocal  whether this stream will only be used on the current thread or not
+     * @return             decompressing stream
+     */
+    public static InputStream inputStream(InputStream in, boolean threadLocal) throws IOException {
         final byte[] headerBytes = new byte[HEADER.length];
         int len = 0;
         while (len < headerBytes.length) {
@@ -88,56 +156,72 @@ public class DeflateCompressor implements Compressor {
             throw new IllegalArgumentException("Input stream is not compressed with DEFLATE!");
         }
 
-        final boolean nowrap = true;
-        final Inflater inflater = new Inflater(nowrap);
-        InputStream decompressedIn = new InflaterInputStream(in, inflater, BUFFER_SIZE);
-        decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE);
-        return new InputStreamStreamInput(decompressedIn) {
-            final AtomicBoolean closed = new AtomicBoolean(false);
-
+        final Releasable releasable;
+        final Inflater inflater;
+        if (threadLocal) {
+            final ReleasableReference<Inflater> current = inflaterForStreamRef.get();
+            if (current.inUse) {
+                // Nested de-compression streams should not happen but we still handle them safely by using a fresh Inflater
+                inflater = new Inflater(true);
+                releasable = inflater::end;
+            } else {
+                inflater = current.get();
+                releasable = current;
+            }
+        } else {
+            inflater = new Inflater(true);
+            releasable = inflater::end;
+        }
+        return new BufferedInputStream(new InflaterInputStream(in, inflater, BUFFER_SIZE) {
+            @Override
             public void close() throws IOException {
                 try {
                     super.close();
                 } finally {
-                    if (closed.compareAndSet(false, true)) {
-                        // important to release native memory
-                        inflater.end();
-                    }
+                    // We are ensured to only call this once since we wrap this stream in a BufferedInputStream that will only close
+                    // its delegate once
+                    releasable.close();
                 }
             }
-        };
+        }, BUFFER_SIZE);
     }
 
     @Override
-    public StreamOutput streamOutput(OutputStream out) throws IOException {
+    public StreamOutput threadLocalStreamOutput(OutputStream out) throws IOException {
         out.write(HEADER);
-        final boolean nowrap = true;
-        final Deflater deflater = new Deflater(LEVEL, nowrap);
+        final ReleasableReference<Deflater> current = deflaterForStreamRef.get();
+        final Releasable releasable;
+        final Deflater deflater;
+        if (current.inUse) {
+            // Nested compression streams should not happen but we still handle them safely by using a fresh Deflater
+            deflater = new Deflater(LEVEL, true);
+            releasable = deflater::end;
+        } else {
+            deflater = current.get();
+            releasable = current;
+        }
         final boolean syncFlush = true;
-        DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
-        OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE);
-        return new OutputStreamStreamOutput(compressedOut) {
-            final AtomicBoolean closed = new AtomicBoolean(false);
-
+        DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush) {
+            @Override
             public void close() throws IOException {
                 try {
                     super.close();
                 } finally {
-                    if (closed.compareAndSet(false, true)) {
-                        // important to release native memory
-                        deflater.end();
-                    }
+                    // We are ensured to only call this once since we wrap this stream in a BufferedOutputStream that will only close
+                    // its delegate once below
+                    releasable.close();
                 }
             }
         };
+        return new OutputStreamStreamOutput(new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE));
     }
 
-    // Reusable Inflater reference. Note: This is not used for the decompressing stream wrapper because we don't have strong guarantees
-    // about the scope in which the stream wrapper is used.
-    private static final ThreadLocal<Inflater> inflaterRef = ThreadLocal.withInitial(() -> new Inflater(true));
-
     private static final ThreadLocal<BytesStreamOutput> baos = ThreadLocal.withInitial(BytesStreamOutput::new);
 
+    // Reusable Inflater reference. Note: This is a separate instance from the one used for the decompressing stream wrapper because we
+    // want to be able to deal with decompressing bytes references that were read from a decompressing stream.
+    private static final ThreadLocal<Inflater> inflaterRef = ThreadLocal.withInitial(() -> new Inflater(true));
+
     @Override
     public BytesReference uncompress(BytesReference bytesReference) throws IOException {
         final BytesStreamOutput buffer = baos.get();
@@ -151,8 +235,8 @@ public class DeflateCompressor implements Compressor {
         return res;
     }
 
-    // Reusable Deflater reference. Note: This is not used for the compressing stream wrapper because we don't have strong guarantees
-    // about the scope in which the stream wrapper is used.
+    // Reusable Deflater reference. Note: This is a separate instance from the one used for the compressing stream wrapper because we
+    // want to be able to deal with compressing bytes references to a decompressing stream.
     private static final ThreadLocal<Deflater> deflaterRef = ThreadLocal.withInitial(() -> new Deflater(LEVEL, true));
 
     @Override

+ 5 - 5
server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java

@@ -51,7 +51,7 @@ public class XContentHelper {
                                               BytesReference bytes) throws IOException {
         Compressor compressor = CompressorFactory.compressor(bytes);
         if (compressor != null) {
-            InputStream compressedInput = compressor.streamInput(bytes.streamInput());
+            InputStream compressedInput = compressor.threadLocalStreamInput(bytes.streamInput());
             if (compressedInput.markSupported() == false) {
                 compressedInput = new BufferedInputStream(compressedInput);
             }
@@ -70,7 +70,7 @@ public class XContentHelper {
         Objects.requireNonNull(xContentType);
         Compressor compressor = CompressorFactory.compressor(bytes);
         if (compressor != null) {
-            InputStream compressedInput = compressor.streamInput(bytes.streamInput());
+            InputStream compressedInput = compressor.threadLocalStreamInput(bytes.streamInput());
             if (compressedInput.markSupported() == false) {
                 compressedInput = new BufferedInputStream(compressedInput);
             }
@@ -106,7 +106,7 @@ public class XContentHelper {
             InputStream input;
             Compressor compressor = CompressorFactory.compressor(bytes);
             if (compressor != null) {
-                InputStream compressedStreamInput = compressor.streamInput(bytes.streamInput());
+                InputStream compressedStreamInput = compressor.threadLocalStreamInput(bytes.streamInput());
                 if (compressedStreamInput.markSupported() == false) {
                     compressedStreamInput = new BufferedInputStream(compressedStreamInput);
                 }
@@ -354,7 +354,7 @@ public class XContentHelper {
                                      ToXContent.Params params) throws IOException {
         Compressor compressor = CompressorFactory.compressor(source);
         if (compressor != null) {
-            try (InputStream compressedStreamInput = compressor.streamInput(source.streamInput())) {
+            try (InputStream compressedStreamInput = compressor.threadLocalStreamInput(source.streamInput())) {
                 builder.rawField(field, compressedStreamInput);
             }
         } else {
@@ -373,7 +373,7 @@ public class XContentHelper {
         Objects.requireNonNull(xContentType);
         Compressor compressor = CompressorFactory.compressor(source);
         if (compressor != null) {
-            try (InputStream compressedStreamInput = compressor.streamInput(source.streamInput())) {
+            try (InputStream compressedStreamInput = compressor.threadLocalStreamInput(source.streamInput())) {
                 builder.rawField(field, compressedStreamInput, xContentType);
             }
         } else {

+ 6 - 4
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -67,6 +67,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.compress.CompressorFactory;
 import org.elasticsearch.common.compress.NotXContentException;
 import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
@@ -1331,10 +1332,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     }
 
     private RepositoryData repositoryDataFromCachedEntry(Tuple<Long, BytesReference> cacheEntry) throws IOException {
-        return RepositoryData.snapshotsFromXContent(
-            XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
-                LoggingDeprecationHandler.INSTANCE,
-                CompressorFactory.COMPRESSOR.streamInput(cacheEntry.v2().streamInput())), cacheEntry.v1(), false);
+        try (StreamInput input = CompressorFactory.COMPRESSOR.threadLocalStreamInput(cacheEntry.v2().streamInput())) {
+            return RepositoryData.snapshotsFromXContent(
+                    XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
+                            LoggingDeprecationHandler.INSTANCE, input), cacheEntry.v1(), false);
+        }
     }
 
     private RepositoryException corruptedStateException(@Nullable Exception cause) {

+ 2 - 1
server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java

@@ -158,7 +158,8 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
                         // in order to write the footer we need to prevent closing the actual index input.
                     }
                 }; XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE,
-                        compress ? CompressorFactory.COMPRESSOR.streamOutput(indexOutputOutputStream) : indexOutputOutputStream)) {
+                        compress ? CompressorFactory.COMPRESSOR.threadLocalStreamOutput(indexOutputOutputStream)
+                                : indexOutputOutputStream)) {
                     builder.startObject();
                     obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);
                     builder.endObject();

+ 1 - 1
server/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java

@@ -52,7 +52,7 @@ final class CompressibleBytesOutputStream extends StreamOutput {
         this.bytesStreamOutput = bytesStreamOutput;
         this.shouldCompress = shouldCompress;
         if (shouldCompress) {
-            this.stream = CompressorFactory.COMPRESSOR.streamOutput(Streams.flushOnCloseStream(bytesStreamOutput));
+            this.stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(bytesStreamOutput));
         } else {
             this.stream = bytesStreamOutput;
         }

+ 1 - 1
server/src/main/java/org/elasticsearch/transport/TransportLogger.java

@@ -168,7 +168,7 @@ public final class TransportLogger {
     private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException {
         if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
             try {
-                return CompressorFactory.COMPRESSOR.streamInput(streamInput);
+                return CompressorFactory.COMPRESSOR.threadLocalStreamInput(streamInput);
             } catch (IllegalArgumentException e) {
                 throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
             }

+ 2 - 2
server/src/test/java/org/elasticsearch/common/compress/DeflateCompressTests.java

@@ -390,7 +390,7 @@ public class DeflateCompressTests extends ESTestCase {
 
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         OutputStreamStreamOutput rawOs = new OutputStreamStreamOutput(bos);
-        StreamOutput os = c.streamOutput(rawOs);
+        StreamOutput os = c.threadLocalStreamOutput(rawOs);
 
         Random r = random();
         int bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000);
@@ -410,7 +410,7 @@ public class DeflateCompressTests extends ESTestCase {
         byte compressed[] = bos.toByteArray();
         ByteBuffer bb2 = ByteBuffer.wrap(compressed);
         StreamInput compressedIn = new ByteBufferStreamInput(bb2);
-        StreamInput in = c.streamInput(compressedIn);
+        StreamInput in = c.threadLocalStreamInput(compressedIn);
 
         // randomize constants again
         bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000);

+ 2 - 2
server/src/test/java/org/elasticsearch/common/compress/DeflateCompressedXContentTests.java

@@ -69,7 +69,7 @@ public class DeflateCompressedXContentTests extends ESTestCase {
     public void testDifferentCompressedRepresentation() throws Exception {
         byte[] b = "---\nf:abcdefghijabcdefghij".getBytes("UTF-8");
         BytesStreamOutput bout = new BytesStreamOutput();
-        StreamOutput out = compressor.streamOutput(bout);
+        StreamOutput out = compressor.threadLocalStreamOutput(bout);
         out.writeBytes(b);
         out.flush();
         out.writeBytes(b);
@@ -77,7 +77,7 @@ public class DeflateCompressedXContentTests extends ESTestCase {
         final BytesReference b1 = bout.bytes();
 
         bout = new BytesStreamOutput();
-        out = compressor.streamOutput(bout);
+        out = compressor.threadLocalStreamOutput(bout);
         out.writeBytes(b);
         out.writeBytes(b);
         out.close();

+ 1 - 1
server/src/test/java/org/elasticsearch/index/mapper/BinaryFieldMapperTests.java

@@ -86,7 +86,7 @@ public class BinaryFieldMapperTests extends ESSingleNodeTestCase {
 
         // case 2: a value that looks compressed: this used to fail in 1.x
         BytesStreamOutput out = new BytesStreamOutput();
-        try (StreamOutput compressed = CompressorFactory.COMPRESSOR.streamOutput(out)) {
+        try (StreamOutput compressed = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(out)) {
             new BytesArray(binaryValue1).writeTo(compressed);
         }
         final byte[] binaryValue2 = BytesReference.toBytes(out.bytes());

+ 2 - 2
server/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java

@@ -71,7 +71,7 @@ public class CompressibleBytesOutputStreamTests extends ESTestCase {
 
         assertTrue(CompressorFactory.COMPRESSOR.isCompressed(bytesRef));
 
-        StreamInput streamInput = CompressorFactory.COMPRESSOR.streamInput(bytesRef.streamInput());
+        StreamInput streamInput = CompressorFactory.COMPRESSOR.threadLocalStreamInput(bytesRef.streamInput());
         byte[] actualBytes = new byte[expectedBytes.length];
         streamInput.readBytes(actualBytes, 0, expectedBytes.length);
 
@@ -94,7 +94,7 @@ public class CompressibleBytesOutputStreamTests extends ESTestCase {
         stream.write(expectedBytes);
 
 
-        StreamInput streamInput = CompressorFactory.COMPRESSOR.streamInput(bStream.bytes().streamInput());
+        StreamInput streamInput = CompressorFactory.COMPRESSOR.threadLocalStreamInput(bStream.bytes().streamInput());
         byte[] actualBytes = new byte[expectedBytes.length];
         EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length));
         assertEquals("Unexpected end of ZLIB input stream", e.getMessage());

+ 3 - 3
server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java

@@ -37,7 +37,7 @@ public class TransportDecompressorTests extends ESTestCase {
 
     public void testSimpleCompression() throws IOException {
         try (BytesStreamOutput output = new BytesStreamOutput()) {
-            StreamOutput deflateStream = CompressorFactory.COMPRESSOR.streamOutput(Streams.flushOnCloseStream(output));
+            StreamOutput deflateStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(output));
             byte randomByte = randomByte();
             deflateStream.write(randomByte);
             deflateStream.close();
@@ -57,7 +57,7 @@ public class TransportDecompressorTests extends ESTestCase {
 
     public void testMultiPageCompression() throws IOException {
         try (BytesStreamOutput output = new BytesStreamOutput()) {
-            StreamOutput deflateStream = CompressorFactory.COMPRESSOR.streamOutput(Streams.flushOnCloseStream(output));
+            StreamOutput deflateStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(output));
             for (int i = 0; i < 10000; ++i) {
                 deflateStream.writeInt(i);
             }
@@ -85,7 +85,7 @@ public class TransportDecompressorTests extends ESTestCase {
 
     public void testIncrementalMultiPageCompression() throws IOException {
         try (BytesStreamOutput output = new BytesStreamOutput()) {
-            StreamOutput deflateStream = CompressorFactory.COMPRESSOR.streamOutput(Streams.flushOnCloseStream(output));
+            StreamOutput deflateStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(output));
             for (int i = 0; i < 10000; ++i) {
                 deflateStream.writeInt(i);
             }

+ 5 - 2
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java

@@ -16,6 +16,7 @@ import org.elasticsearch.client.ResponseListener;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.compress.CompressorFactory;
+import org.elasticsearch.common.compress.DeflateCompressor;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.time.DateFormatter;
@@ -83,7 +84,7 @@ class HttpExportBulk extends ExportBulk {
             if (docs != null && docs.isEmpty() == false) {
                 final BytesStreamOutput scratch = new BytesStreamOutput();
                 final CountingOutputStream countingStream;
-                try (StreamOutput payload = CompressorFactory.COMPRESSOR.streamOutput(scratch)) {
+                try (StreamOutput payload = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(scratch)) {
                     countingStream = new CountingOutputStream(payload);
                     for (MonitoringDoc monitoringDoc : docs) {
                         writeDocument(monitoringDoc, countingStream);
@@ -108,8 +109,10 @@ class HttpExportBulk extends ExportBulk {
                 request.addParameter(param.getKey(), param.getValue());
             }
             try {
+                // Don't use a thread-local decompressing stream since the HTTP client does not give strong guarantees about
+                // thread-affinity when reading and closing the request entity
                 request.setEntity(new InputStreamEntity(
-                        CompressorFactory.COMPRESSOR.streamInput(payload.streamInput()), payloadLength, ContentType.APPLICATION_JSON));
+                        DeflateCompressor.inputStream(payload.streamInput(), false), payloadLength, ContentType.APPLICATION_JSON));
             } catch (IOException e) {
                 listener.onFailure(e);
                 return;