Browse Source

Improve some BytesStreamOutput Usage (#60730)

* Stop redundantly creating a `0` length `ByteArray` that is never used
* Add efficient way to get a minimal size copy of the bytes in a `BytesStreamOutput`
* Avoid multiple redundant `byte[]` copies in search cache key creation
Armin Braun 5 years ago
parent
commit
ec175d1653

+ 3 - 1
server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java

@@ -69,7 +69,9 @@ public class PagedBytesReference extends AbstractBytesReference {
     public BytesRef toBytesRef() {
         BytesRef bref = new BytesRef();
         // if length <= pagesize this will dereference the page, or materialize the byte[]
-        byteArray.get(offset, length, bref);
+        if (byteArray != null) {
+            byteArray.get(offset, length, bref);
+        }
         return bref;
     }
 

+ 38 - 6
server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java

@@ -19,6 +19,10 @@
 
 package org.elasticsearch.common.io.stream;
 
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.PagedBytesReference;
 import org.elasticsearch.common.util.BigArrays;
@@ -35,6 +39,7 @@ public class BytesStreamOutput extends BytesStream {
 
     protected final BigArrays bigArrays;
 
+    @Nullable
     protected ByteArray bytes;
     protected int count;
 
@@ -59,16 +64,18 @@ public class BytesStreamOutput extends BytesStream {
 
     protected BytesStreamOutput(int expectedSize, BigArrays bigArrays) {
         this.bigArrays = bigArrays;
-        this.bytes = bigArrays.newByteArray(expectedSize, false);
+        if (expectedSize != 0) {
+            this.bytes = bigArrays.newByteArray(expectedSize, false);
+        }
     }
 
     @Override
-    public long position() throws IOException {
+    public long position() {
         return count;
     }
 
     @Override
-    public void writeByte(byte b) throws IOException {
+    public void writeByte(byte b) {
         ensureCapacity(count + 1L);
         bytes.set(count, b);
         count++;
@@ -99,7 +106,7 @@ public class BytesStreamOutput extends BytesStream {
     @Override
     public void reset() {
         // shrink list of pages
-        if (bytes.size() > PageCacheRecycler.PAGE_SIZE_IN_BYTES) {
+        if (bytes != null && bytes.size() > PageCacheRecycler.PAGE_SIZE_IN_BYTES) {
             bytes = bigArrays.resize(bytes, PageCacheRecycler.PAGE_SIZE_IN_BYTES);
         }
 
@@ -108,7 +115,7 @@ public class BytesStreamOutput extends BytesStream {
     }
 
     @Override
-    public void flush() throws IOException {
+    public void flush() {
         // nothing to do
     }
 
@@ -143,6 +150,27 @@ public class BytesStreamOutput extends BytesStream {
         return new PagedBytesReference(bytes, count);
     }
 
+    /**
+     * Like {@link #bytes()} but copies the bytes to a freshly allocated buffer.
+     *
+     * @return copy of the bytes in this instances
+     */
+    public BytesReference copyBytes() {
+        final byte[] keyBytes = new byte[count];
+        int offset = 0;
+        final BytesRefIterator iterator = bytes().iterator();
+        try {
+            BytesRef slice;
+            while ((slice = iterator.next()) != null) {
+                System.arraycopy(slice.bytes, slice.offset, keyBytes, offset, slice.length);
+                offset += slice.length;
+            }
+        } catch (IOException e) {
+            throw new AssertionError(e);
+        }
+        return new BytesArray(keyBytes);
+    }
+
     /**
      * Returns the number of bytes used by the underlying {@link org.elasticsearch.common.util.ByteArray}
      * @see org.elasticsearch.common.util.ByteArray#ramBytesUsed()
@@ -155,7 +183,11 @@ public class BytesStreamOutput extends BytesStream {
         if (offset > Integer.MAX_VALUE) {
             throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
         }
-        bytes = bigArrays.grow(bytes, offset);
+        if (bytes == null) {
+            this.bytes = bigArrays.newByteArray(BigArrays.overSize(offset, PageCacheRecycler.PAGE_SIZE_IN_BYTES, 1), false);
+        } else {
+            bytes = bigArrays.grow(bytes, offset);
+        }
     }
 
 }

+ 2 - 1
server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java

@@ -21,6 +21,7 @@ package org.elasticsearch.common.io.stream;
 
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.PageCacheRecycler;
 
@@ -45,6 +46,6 @@ public class ReleasableBytesStreamOutput extends BytesStreamOutput implements Re
 
     @Override
     public void close() {
-        bytes.close();
+        Releasables.close(bytes);
     }
 }

+ 10 - 0
server/src/main/java/org/elasticsearch/common/lease/Releasables.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.common.lease;
 
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.core.internal.io.IOUtils;
 
 import java.io.IOException;
@@ -46,6 +47,15 @@ public enum Releasables {
         close(releasables, false);
     }
 
+    /** Release the provided {@link Releasable}. */
+    public static void close(@Nullable Releasable releasable) {
+        try {
+            IOUtils.close(releasable);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
     /** Release the provided {@link Releasable}s. */
     public static void close(Releasable... releasables) {
         close(Arrays.asList(releasables));

+ 10 - 6
server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java

@@ -31,7 +31,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.CheckedFunction;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -342,15 +341,20 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
         this.canReturnNullResponseIfMatchNoDocs = value;
     }
 
+    private static final ThreadLocal<BytesStreamOutput> scratch = ThreadLocal.withInitial(BytesStreamOutput::new);
+
     /**
      * Returns the cache key for this shard search request, based on its content
      */
     public BytesReference cacheKey() throws IOException {
-        BytesStreamOutput out = new BytesStreamOutput();
-        this.innerWriteTo(out, true);
-        // copy it over, most requests are small, we might as well copy to make sure we are not sliced...
-        // we could potentially keep it without copying, but then pay the price of extra unused bytes up to a page
-        return new BytesArray(out.bytes().toBytesRef(), true);// do a deep copy
+        BytesStreamOutput out = scratch.get();
+        try {
+            this.innerWriteTo(out, true);
+            // copy it over since we don't want to share the thread-local bytes in #scratch
+            return out.copyBytes();
+        } finally {
+            out.reset();
+        }
     }
 
     public String getClusterAlias() {

+ 3 - 2
server/src/main/java/org/elasticsearch/transport/TcpHeader.java

@@ -60,10 +60,11 @@ public class TcpHeader {
         }
     }
 
+    private static final byte[] PREFIX = {(byte) 'E', (byte) 'S'};
+
     public static void writeHeader(StreamOutput output, long requestId, byte status, Version version, int contentSize,
                                    int variableHeaderSize) throws IOException {
-        output.writeByte((byte)'E');
-        output.writeByte((byte)'S');
+        output.writeBytes(PREFIX);
         // write the size, the size indicates the remaining message size, not including the size int
         if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
             output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE + VARIABLE_HEADER_SIZE);

+ 14 - 11
server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java

@@ -48,6 +48,19 @@ final class TransportKeepAlive implements Closeable {
 
     static final int PING_DATA_SIZE = -1;
 
+    private static final BytesReference PING_MESSAGE;
+
+    static {
+        try (BytesStreamOutput out = new BytesStreamOutput()) {
+            out.writeByte((byte) 'E');
+            out.writeByte((byte) 'S');
+            out.writeInt(PING_DATA_SIZE);
+            PING_MESSAGE = out.copyBytes();
+        } catch (IOException e) {
+            throw new AssertionError(e.getMessage(), e); // won't happen
+        }
+    }
+
     private final Logger logger = LogManager.getLogger(TransportKeepAlive.class);
     private final CounterMetric successfulPings = new CounterMetric();
     private final CounterMetric failedPings = new CounterMetric();
@@ -55,21 +68,11 @@ final class TransportKeepAlive implements Closeable {
     private final Lifecycle lifecycle = new Lifecycle();
     private final ThreadPool threadPool;
     private final AsyncBiFunction<TcpChannel, BytesReference, Void> pingSender;
-    private final BytesReference pingMessage;
 
     TransportKeepAlive(ThreadPool threadPool, AsyncBiFunction<TcpChannel, BytesReference, Void> pingSender) {
         this.threadPool = threadPool;
         this.pingSender = pingSender;
 
-        try (BytesStreamOutput out = new BytesStreamOutput()) {
-            out.writeByte((byte) 'E');
-            out.writeByte((byte) 'S');
-            out.writeInt(PING_DATA_SIZE);
-            pingMessage = out.bytes();
-        } catch (IOException e) {
-            throw new AssertionError(e.getMessage(), e); // won't happen
-        }
-
         this.lifecycle.moveToStarted();
     }
 
@@ -112,7 +115,7 @@ final class TransportKeepAlive implements Closeable {
     }
 
     private void sendPing(TcpChannel channel) {
-        pingSender.apply(channel, pingMessage, new ActionListener<Void>() {
+        pingSender.apply(channel, PING_MESSAGE, new ActionListener<Void>() {
 
             @Override
             public void onResponse(Void v) {