Browse Source

Unify Stream Copy Buffer Usage (#56078)

We have various ways of copying between two streams and handling thread-local
buffers throughout the codebase. This commit unifies a number of them and
removes buffer allocations in many spots.
Armin Braun 5 years ago
parent
commit
e28dbde289

+ 37 - 6
libs/core/src/main/java/org/elasticsearch/core/internal/io/Streams.java

@@ -30,30 +30,61 @@ import java.io.OutputStream;
  */
 public class Streams {
 
+    private static final ThreadLocal<byte[]> buffer = ThreadLocal.withInitial(() -> new byte[8 * 1024]);
+
     private Streams() {
 
     }
 
     /**
-     * Copy the contents of the given InputStream to the given OutputStream. Closes both streams when done.
+     * Copy the contents of the given InputStream to the given OutputStream. Optionally, closes both streams when done.
      *
-     * @param in  the stream to copy from
-     * @param out the stream to copy to
+     * @param in     the stream to copy from
+     * @param out    the stream to copy to
+     * @param close  whether to close both streams after copying
+     * @param buffer buffer to use for copying
      * @return the number of bytes copied
      * @throws IOException in case of I/O errors
      */
-    public static long copy(final InputStream in, final OutputStream out) throws IOException {
+    public static long copy(final InputStream in, final OutputStream out, byte[] buffer, boolean close) throws IOException {
         Exception err = null;
         try {
-            final long byteCount = in.transferTo(out);
+            long byteCount = 0;
+            int bytesRead;
+            while ((bytesRead = in.read(buffer)) != -1) {
+                out.write(buffer, 0, bytesRead);
+                byteCount += bytesRead;
+            }
             out.flush();
             return byteCount;
         } catch (IOException | RuntimeException e) {
             err = e;
             throw e;
         } finally {
-            IOUtils.close(err, in, out);
+            if (close) {
+                IOUtils.close(err, in, out);
+            }
         }
     }
 
+    /**
+     * @see #copy(InputStream, OutputStream, byte[], boolean)
+     */
+    public static long copy(final InputStream in, final OutputStream out, boolean close) throws IOException {
+        return copy(in, out, buffer.get(), close);
+    }
+
+    /**
+     * @see #copy(InputStream, OutputStream, byte[], boolean)
+     */
+    public static long copy(final InputStream in, final OutputStream out, byte[] buffer) throws IOException {
+        return copy(in, out, buffer, true);
+    }
+
+    /**
+     * @see #copy(InputStream, OutputStream, byte[], boolean)
+     */
+    public static long copy(final InputStream in, final OutputStream out) throws IOException {
+        return copy(in, out, buffer.get(), true);
+    }
 }

+ 3 - 49
libs/x-content/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java

@@ -36,7 +36,7 @@ import org.elasticsearch.common.xcontent.XContentGenerator;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.support.filtering.FilterPathBasedFilter;
-import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.core.internal.io.Streams;
 
 import java.io.BufferedInputStream;
 import java.io.IOException;
@@ -349,7 +349,7 @@ public class JsonXContentGenerator implements XContentGenerator {
         } else {
             writeStartRaw(name);
             flush();
-            copyStream(content, os);
+            Streams.copy(content, os, false);
             writeEndRaw();
         }
     }
@@ -364,24 +364,11 @@ public class JsonXContentGenerator implements XContentGenerator {
                 generator.writeRaw(':');
             }
             flush();
-            transfer(stream, os);
+            Streams.copy(stream, os);
             writeEndRaw();
         }
     }
 
-    // A basic copy of Java 9's InputStream#transferTo
-    private static long transfer(InputStream in, OutputStream out) throws IOException {
-        Objects.requireNonNull(out, "out");
-        long transferred = 0;
-        byte[] buffer = new byte[8192];
-        int read;
-        while ((read = in.read(buffer, 0, 8192)) >= 0) {
-            out.write(buffer, 0, read);
-            transferred += read;
-        }
-        return transferred;
-    }
-
     private boolean mayWriteRawData(XContentType contentType) {
         // When the current generator is filtered (ie filter != null)
         // or the content is in a different format than the current generator,
@@ -480,37 +467,4 @@ public class JsonXContentGenerator implements XContentGenerator {
     public boolean isClosed() {
         return generator.isClosed();
     }
-
-    /**
-     * Copy the contents of the given InputStream to the given OutputStream.
-     * Closes both streams when done.
-     *
-     * @param in  the stream to copy from
-     * @param out the stream to copy to
-     * @return the number of bytes copied
-     * @throws IOException in case of I/O errors
-     */
-    private static long copyStream(InputStream in, OutputStream out) throws IOException {
-        Objects.requireNonNull(in, "No InputStream specified");
-        Objects.requireNonNull(out, "No OutputStream specified");
-        final byte[] buffer = new byte[8192];
-        boolean success = false;
-        try {
-            long byteCount = 0;
-            int bytesRead;
-            while ((bytesRead = in.read(buffer)) != -1) {
-                out.write(buffer, 0, bytesRead);
-                byteCount += bytesRead;
-            }
-            out.flush();
-            success = true;
-            return byteCount;
-        } finally {
-            if (success) {
-                IOUtils.close(in, out);
-            } else {
-                IOUtils.closeWhileHandlingException(in, out);
-            }
-        }
-    }
 }

+ 2 - 2
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

@@ -292,7 +292,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
                  * It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy
                  * is in the stacktrace and is not granted the permissions needed to close and write the channel.
                  */
-                Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
+                org.elasticsearch.core.internal.io.Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
 
                     @SuppressForbidden(reason = "channel is based on a socket")
                     @Override
@@ -350,7 +350,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
         throws IOException {
         assert blobSize <= getLargeBlobThresholdInBytes() : "large blob uploads should use the resumable upload method";
         final byte[] buffer = new byte[Math.toIntExact(blobSize)];
-        org.elasticsearch.common.io.Streams.readFully(inputStream, buffer);
+        Streams.readFully(inputStream, buffer);
         try {
             final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
                 new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :

+ 3 - 2
server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

@@ -165,7 +165,7 @@ public class FsBlobContainer extends AbstractBlobContainer {
             channel.position(position);
         }
         assert channel.position() == position;
-        return org.elasticsearch.common.io.Streams.limitStream(Channels.newInputStream(channel), length);
+        return Streams.limitStream(Channels.newInputStream(channel), length);
     }
 
     @Override
@@ -212,7 +212,8 @@ public class FsBlobContainer extends AbstractBlobContainer {
     private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException {
         try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
             final int bufferSize = blobStore.bufferSizeInBytes();
-            Streams.copy(inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]);
+            org.elasticsearch.core.internal.io.Streams.copy(
+                    inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]);
         }
         IOUtils.fsync(tempBlobPath, false);
     }

+ 4 - 45
server/src/main/java/org/elasticsearch/common/io/Streams.java

@@ -65,45 +65,6 @@ public abstract class Streams {
         }
     };
 
-    //---------------------------------------------------------------------
-    // Copy methods for java.io.InputStream / java.io.OutputStream
-    //---------------------------------------------------------------------
-
-
-    public static long copy(InputStream in, OutputStream out) throws IOException {
-        return copy(in, out, new byte[BUFFER_SIZE]);
-    }
-
-    /**
-     * Copy the contents of the given InputStream to the given OutputStream.
-     * Closes both streams when done.
-     *
-     * @param in  the stream to copy from
-     * @param out the stream to copy to
-     * @return the number of bytes copied
-     * @throws IOException in case of I/O errors
-     */
-    public static long copy(InputStream in, OutputStream out, byte[] buffer) throws IOException {
-        Objects.requireNonNull(in, "No InputStream specified");
-        Objects.requireNonNull(out, "No OutputStream specified");
-        // Leverage try-with-resources to close in and out so that exceptions in close() are either propagated or added as suppressed
-        // exceptions to the main exception
-        try (InputStream in2 = in; OutputStream out2 = out) {
-            return doCopy(in2, out2, buffer);
-        }
-    }
-
-    private static long doCopy(InputStream in, OutputStream out, byte[] buffer) throws IOException {
-        long byteCount = 0;
-        int bytesRead;
-        while ((bytesRead = in.read(buffer)) != -1) {
-            out.write(buffer, 0, bytesRead);
-            byteCount += bytesRead;
-        }
-        out.flush();
-        return byteCount;
-    }
-
     /**
      * Copy the contents of the given byte array to the given OutputStream.
      * Closes the stream when done.
@@ -222,7 +183,7 @@ public abstract class Streams {
      * Fully consumes the input stream, throwing the bytes away. Returns the number of bytes consumed.
      */
     public static long consumeFully(InputStream inputStream) throws IOException {
-        return copy(inputStream, NULL_OUTPUT_STREAM);
+        return org.elasticsearch.core.internal.io.Streams.copy(inputStream, NULL_OUTPUT_STREAM);
     }
 
     public static List<String> readAllLines(InputStream input) throws IOException {
@@ -267,11 +228,9 @@ public abstract class Streams {
      * Reads all bytes from the given {@link InputStream} and closes it afterwards.
      */
     public static BytesReference readFully(InputStream in) throws IOException {
-        try (InputStream inputStream = in) {
-            BytesStreamOutput out = new BytesStreamOutput();
-            copy(inputStream, out);
-            return out.bytes();
-        }
+        BytesStreamOutput out = new BytesStreamOutput();
+        org.elasticsearch.core.internal.io.Streams.copy(in, out);
+        return out.bytes();
     }
 
     /**

+ 1 - 1
server/src/test/java/org/elasticsearch/common/io/StreamsTests.java

@@ -91,7 +91,7 @@ public class StreamsTests extends ESTestCase {
         final int limit = randomIntBetween(0, bytes.length);
         final BytesArray stuffArray = new BytesArray(bytes);
         final ByteArrayOutputStream out = new ByteArrayOutputStream(bytes.length);
-        final long count = Streams.copy(Streams.limitStream(stuffArray.streamInput(), limit), out);
+        final long count = org.elasticsearch.core.internal.io.Streams.copy(Streams.limitStream(stuffArray.streamInput(), limit), out);
         assertEquals(limit, count);
         assertThat(Arrays.equals(out.toByteArray(), Arrays.copyOf(bytes, limit)), equalTo(true));
     }

+ 1 - 1
server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java

@@ -28,13 +28,13 @@ import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.collect.Tuple;
-import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.core.internal.io.Streams;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;

+ 1 - 1
server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java

@@ -29,13 +29,13 @@ import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.collect.Tuple;
-import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.core.internal.io.Streams;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyUtils.java

@@ -10,12 +10,12 @@ import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.compress.NotXContentException;
-import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.internal.io.Streams;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;

+ 1 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java

@@ -53,7 +53,6 @@ import org.elasticsearch.common.cache.Cache;
 import org.elasticsearch.common.cache.CacheBuilder;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.hash.MessageDigests;
-import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.InputStreamStreamInput;
 import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
@@ -70,6 +69,7 @@ import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.internal.io.Streams;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.query.BoolQueryBuilder;