Browse Source

Speed up writeVInt (#62345)

This speeds up `StreamOutput#writeVInt` quite a bit which is nice
because it is *very* commonly called when serializing aggregations. Well,
when serializing anything. All "collections" serialize their size as a
vint. Anyway, I was examining the serialization speeds of `StringTerms`
and this saves about 30% of the write time for that. I expect it'll be
useful other places.
Nik Everett 5 years ago
parent
commit
dfc45396e7

+ 0 - 1
benchmarks/README.md

@@ -78,7 +78,6 @@ cd fcml*
 make
 cd example/hsdis
 make
-cp .libs/libhsdis.so.0.0.0
 sudo cp .libs/libhsdis.so.0.0.0 /usr/lib/jvm/java-14-adoptopenjdk/lib/hsdis-amd64.so
 ```
 

+ 92 - 0
benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java

@@ -0,0 +1,92 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.benchmark.search.aggregations.bucket.terms;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.io.stream.DelayableWriteable;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.BucketOrder;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@Fork(2)
+@Warmup(iterations = 10)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class StringTermsSerializationBenchmark {
+    private static final NamedWriteableRegistry REGISTRY = new NamedWriteableRegistry(
+        List.of(new NamedWriteableRegistry.Entry(InternalAggregation.class, StringTerms.NAME, StringTerms::new))
+    );
+    @Param(value = { "1000" })
+    private int buckets;
+
+    private DelayableWriteable<InternalAggregations> results;
+
+    @Setup
+    public void initResults() {
+        results = DelayableWriteable.referencing(InternalAggregations.from(List.of(newTerms(true))));
+    }
+
+    private StringTerms newTerms(boolean withNested) {
+        List<StringTerms.Bucket> resultBuckets = new ArrayList<>(buckets);
+        for (int i = 0; i < buckets; i++) {
+            InternalAggregations inner = withNested ? InternalAggregations.from(List.of(newTerms(false))) : InternalAggregations.EMPTY;
+            resultBuckets.add(new StringTerms.Bucket(new BytesRef("test" + i), i, inner, false, 0, DocValueFormat.RAW));
+        }
+        return new StringTerms(
+            "test",
+            BucketOrder.key(true),
+            BucketOrder.key(true),
+            buckets,
+            1,
+            null,
+            DocValueFormat.RAW,
+            buckets,
+            false,
+            100000,
+            resultBuckets,
+            0
+        );
+    }
+
+    @Benchmark
+    public DelayableWriteable<InternalAggregations> serialize() {
+        return results.asSerialized(InternalAggregations::readFrom, REGISTRY);
+    }
+}

+ 17 - 3
server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

@@ -218,12 +218,26 @@ public abstract class StreamOutput extends OutputStream {
      * using {@link #writeInt}
      */
     public void writeVInt(int i) throws IOException {
-        final byte[] buffer = scratch.get();
+        /*
+         * Shortcut writing single byte because it is very, very common and
+         * can skip grabbing the scratch buffer. This is marginally slower
+         * than hand unrolling the entire encoding loop but hand unrolling
+         * the encoding loop blows out the method size so it can't be inlined.
+         * In that case benchmarks of the method itself are faster but
+         * benchmarks of methods that use this method are slower.
+         * This is philosophically in line with vint in general - it biases
+         * twoards being simple and fast for smaller numbers.
+         */
+        if (Integer.numberOfLeadingZeros(i) >= 25) {
+            writeByte((byte) i);
+            return;
+        }
+        byte[] buffer = scratch.get();
         int index = 0;
-        while ((i & ~0x7F) != 0) {
+        do {
             buffer[index++] = ((byte) ((i & 0x7f) | 0x80));
             i >>>= 7;
-        }
+        } while ((i & ~0x7F) != 0);
         buffer[index++] = ((byte) i);
         writeBytes(buffer, 0, index);
     }

+ 11 - 1
server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java

@@ -61,7 +61,7 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 
 /**
- * Tests for {@link BytesStreamOutput} paging behaviour.
+ * Tests for {@link StreamOutput}.
  */
 public class BytesStreamsTests extends ESTestCase {
     public void testEmpty() throws Exception {
@@ -827,6 +827,16 @@ public class BytesStreamsTests extends ESTestCase {
         final int value = randomInt();
         BytesStreamOutput output = new BytesStreamOutput();
         output.writeVInt(value);
+
+        BytesStreamOutput simple = new BytesStreamOutput();
+        int i = value;
+        while ((i & ~0x7F) != 0) {
+            simple.writeByte(((byte) ((i & 0x7f) | 0x80)));
+            i >>>= 7;
+        }
+        simple.writeByte((byte) i);
+        assertEquals(simple.bytes().toBytesRef().toString(), output.bytes().toBytesRef().toString());
+
         StreamInput input = output.bytes().streamInput();
         assertEquals(value, input.readVInt());
     }