Browse Source

Optimize Bulk Message Parsing and Message Length Parsing (#39634)

* Optimize Bulk Message Parsing and Message Length Parsing

* findNextMarker took almost 1ms per invocation during the PMC rally track
  * Fixed to be about an order of magnitude faster by using Netty's bulk `ByteBuf` search
* It is unnecessary to instantiate an object (the input stream wrapper) and throw it away, just to read the `int` length from the message bytes
  * Fixed by adding bulk `int` read to BytesReference
Armin Braun 6 years ago
parent
commit
0619e6e9a2

+ 11 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java

@@ -45,6 +45,17 @@ final class ByteBufBytesReference extends BytesReference {
         return buffer.getByte(offset + index);
     }
 
+    @Override
+    public int getInt(int index) {
+        return buffer.getInt(offset + index);
+    }
+
+    @Override
+    public int indexOf(byte marker, int from) {
+        final int start = offset + from;
+        return buffer.forEachByte(start, length - start, value -> value != marker);
+    }
+
     @Override
     public int length() {
         return length;

+ 11 - 0
plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java

@@ -90,6 +90,17 @@ class ByteBufUtils {
             return buffer.getByte(offset + index);
         }
 
+        @Override
+        public int getInt(int index) {
+            return buffer.getInt(offset + index);
+        }
+
+        @Override
+        public int indexOf(byte marker, int from) {
+            final int start = offset + from;
+            return buffer.forEachByte(start, length - start, value -> value != marker);
+        }
+
         @Override
         public int length() {
             return length;

+ 9 - 10
server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

@@ -364,11 +364,10 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
         XContent xContent = xContentType.xContent();
         int line = 0;
         int from = 0;
-        int length = data.length();
         byte marker = xContent.streamSeparator();
         boolean typesDeprecationLogged = false;
         while (true) {
-            int nextMarker = findNextMarker(marker, from, data, length);
+            int nextMarker = findNextMarker(marker, from, data);
             if (nextMarker == -1) {
                 break;
             }
@@ -477,7 +476,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
                     add(new DeleteRequest(index, type, id).routing(routing)
                         .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm), payload);
                 } else {
-                    nextMarker = findNextMarker(marker, from, data, length);
+                    nextMarker = findNextMarker(marker, from, data);
                     if (nextMarker == -1) {
                         break;
                     }
@@ -615,16 +614,16 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
         return globalRouting;
     }
 
-    private int findNextMarker(byte marker, int from, BytesReference data, int length) {
-        for (int i = from; i < length; i++) {
-            if (data.get(i) == marker) {
-                return i;
-            }
+    private static int findNextMarker(byte marker, int from, BytesReference data) {
+        final int res = data.indexOf(marker, from);
+        if (res != -1) {
+            assert res >= 0;
+            return res;
         }
-        if (from != length) {
+        if (from != data.length()) {
             throw new IllegalArgumentException("The bulk request must be terminated by a newline [\n]");
         }
-        return -1;
+        return res;
     }
 
     @Override

+ 8 - 9
server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java

@@ -177,10 +177,9 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
                                            NamedXContentRegistry registry,
                                            boolean allowExplicitIndex) throws IOException {
         int from = 0;
-        int length = data.length();
         byte marker = xContent.streamSeparator();
         while (true) {
-            int nextMarker = findNextMarker(marker, from, data, length);
+            int nextMarker = findNextMarker(marker, from, data);
             if (nextMarker == -1) {
                 break;
             }
@@ -261,7 +260,7 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
             // move pointers
             from = nextMarker + 1;
             // now for the body
-            nextMarker = findNextMarker(marker, from, data, length);
+            nextMarker = findNextMarker(marker, from, data);
             if (nextMarker == -1) {
                 break;
             }
@@ -275,13 +274,13 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
         }
     }
 
-    private static int findNextMarker(byte marker, int from, BytesReference data, int length) {
-        for (int i = from; i < length; i++) {
-            if (data.get(i) == marker) {
-                return i;
-            }
+    private static int findNextMarker(byte marker, int from, BytesReference data) {
+        final int res = data.indexOf(marker, from);
+        if (res != -1) {
+            assert res >= 0;
+            return res;
         }
-        if (from != length) {
+        if (from != data.length()) {
             throw new IllegalArgumentException("The msearch request must be terminated by a newline [\n]");
         }
         return -1;

+ 5 - 0
server/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java

@@ -46,6 +46,11 @@ public class ByteBufferReference extends BytesReference {
         return buffer.get(index);
     }
 
+    @Override
+    public int getInt(int index) {
+        return buffer.getInt(index);
+    }
+
     @Override
     public int length() {
         return length;

+ 23 - 0
server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java

@@ -60,6 +60,29 @@ public abstract class BytesReference implements Comparable<BytesReference>, ToXC
      */
     public abstract byte get(int index);
 
+    /**
+     * Returns the integer read from the 4 bytes (BE) starting at the given index.
+     */
+    public int getInt(int index) {
+        return (get(index) & 0xFF) << 24 | (get(index + 1) & 0xFF) << 16 | (get(index + 2) & 0xFF) << 8 | get(index + 3) & 0xFF;
+    }
+
+    /**
+     * Finds the index of the first occurrence of the given marker between within the given bounds.
+     * @param marker marker byte to search
+     * @param from lower bound for the index to check (inclusive)
+     * @return first index of the marker or {@code -1} if not found
+     */
+    public int indexOf(byte marker, int from) {
+        final int to = length();
+        for (int i = from; i < to; i++) {
+            if (get(i) == marker) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
     /**
      * The length.
      */

+ 1 - 5
server/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -840,11 +840,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
                 + Integer.toHexString(headerBuffer.get(2) & 0xFF) + ","
                 + Integer.toHexString(headerBuffer.get(3) & 0xFF) + ")");
         }
-        final int messageLength;
-        try (StreamInput input = headerBuffer.streamInput()) {
-            input.skip(TcpHeader.MARKER_BYTES_SIZE);
-            messageLength = input.readInt();
-        }
+        final int messageLength = headerBuffer.getInt(TcpHeader.MARKER_BYTES_SIZE);
 
         if (messageLength == TransportKeepAlive.PING_DATA_SIZE) {
             // This is a ping

+ 37 - 0
test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java

@@ -34,7 +34,14 @@ import org.elasticsearch.test.ESTestCase;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public abstract class AbstractBytesReferenceTestCase extends ESTestCase {
 
@@ -648,4 +655,34 @@ public abstract class AbstractBytesReferenceTestCase extends ESTestCase {
             assertNotEquals(b1, b2);
         }
     }
+
+    public void testGetInt() throws IOException {
+        final int count = randomIntBetween(1, 10);
+        final BytesReference bytesReference = newBytesReference(count * Integer.BYTES);
+        final BytesRef bytesRef = bytesReference.toBytesRef();
+        final IntBuffer intBuffer =
+            ByteBuffer.wrap(bytesRef.bytes, bytesRef.offset, bytesRef.length).order(ByteOrder.BIG_ENDIAN).asIntBuffer();
+        for (int i = 0; i < count; ++i) {
+            assertEquals(intBuffer.get(i), bytesReference.getInt(i * Integer.BYTES));
+        }
+    }
+
+    public void testIndexOf() throws IOException {
+        final int size = randomIntBetween(0, 100);
+        final BytesReference bytesReference = newBytesReference(size);
+        final Map<Byte, List<Integer>> map = new HashMap<>();
+        for (int i = 0; i < size; ++i) {
+            final byte value = bytesReference.get(i);
+            map.computeIfAbsent(value, v -> new ArrayList<>()).add(i);
+        }
+        map.forEach((value, positions) -> {
+            for (int i = 0; i < positions.size(); i++) {
+                final int pos = positions.get(i);
+                final int from = i == 0 ? randomIntBetween(0, pos) : positions.get(i - 1) + 1;
+                assertEquals(bytesReference.indexOf(value, from), pos);
+            }
+        });
+        final byte missing = randomValueOtherThanMany(map::containsKey, ESTestCase::randomByte);
+        assertEquals(-1, bytesReference.indexOf(missing, randomIntBetween(0, Math.max(0, size - 1))));
+    }
 }