浏览代码

SQL: Compress Cursors (#83591)

Partially addresses #83284

It looks like compressing cursors is a low hanging fruit to address issues around cursor size that seems to pay off pretty nicely. This is especially relevant for queries using the ListCursor (e.g. SHOW COLUMNS or queries sorting on aggregated columns) but might also benefit queries that store SearchSourceBuilder in the cursor.
Lukas Wegmann 3 年之前
父节点
当前提交
98fdf4fa59

+ 5 - 0
docs/changelog/83591.yaml

@@ -0,0 +1,5 @@
+pr: 83591
+summary: Compress Cursors
+area: SQL
+type: enhancement
+issues: []

+ 16 - 0
x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java

@@ -49,6 +49,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
@@ -74,6 +76,7 @@ import static org.elasticsearch.xpack.sql.proto.CoreProtocol.URL_PARAM_DELIMITER
 import static org.elasticsearch.xpack.sql.proto.CoreProtocol.URL_PARAM_FORMAT;
 import static org.elasticsearch.xpack.sql.proto.CoreProtocol.WAIT_FOR_COMPLETION_TIMEOUT_NAME;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.lessThan;
 
 /**
  * Integration test for the rest sql action. The one that speaks json directly to a
@@ -1433,6 +1436,19 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
         }
     }
 
+    public void testCompressCursor() throws IOException {
+        String doc = IntStream.range(0, 1000)
+            .mapToObj(i -> String.format(Locale.ROOT, "\"field%d\": %d", i, i))
+            .collect(Collectors.joining(","));
+        index("{" + doc + "}");
+
+        String mode = randomMode();
+        Map<String, Object> resp = toMap(runSql(query("SHOW COLUMNS FROM " + indexPattern("test")).fetchSize(1).mode(mode)), mode);
+
+        // without compression, the cursor is at least <avg. fieldname length> * 1000 bytes (in fact it is ~35kb)
+        assertThat(resp.get("cursor").toString().length(), lessThan(5000));
+    }
+
     static Map<String, Object> runSql(RequestObjectBuilder builder, String mode) throws IOException {
         return toMap(runSql(builder.mode(mode)), mode);
     }

+ 16 - 11
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/common/io/SqlStreamInput.java

@@ -8,6 +8,8 @@
 package org.elasticsearch.xpack.sql.common.io;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.common.compress.CompressorFactory;
+import org.elasticsearch.common.io.stream.InputStreamStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -23,22 +25,25 @@ import java.util.Base64;
  */
 public class SqlStreamInput extends NamedWriteableAwareStreamInput {
 
-    private final ZoneId zoneId;
+    public static SqlStreamInput fromString(String base64encoded, NamedWriteableRegistry namedWriteableRegistry, Version version)
+        throws IOException {
+        byte[] bytes = Base64.getDecoder().decode(base64encoded);
+        StreamInput in = StreamInput.wrap(bytes);
+        Version inVersion = Version.readVersion(in);
+        if (version.compareTo(inVersion) != 0) {
+            throw new SqlIllegalArgumentException("Unsupported cursor version [{}], expected [{}]", inVersion, version);
+        }
 
-    public SqlStreamInput(String base64encoded, NamedWriteableRegistry namedWriteableRegistry, Version version) throws IOException {
-        this(Base64.getDecoder().decode(base64encoded), namedWriteableRegistry, version);
+        InputStreamStreamInput uncompressingIn = new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(in));
+        return new SqlStreamInput(uncompressingIn, namedWriteableRegistry, inVersion);
     }
 
-    public SqlStreamInput(byte[] input, NamedWriteableRegistry namedWriteableRegistry, Version version) throws IOException {
-        super(StreamInput.wrap(input), namedWriteableRegistry);
+    private final ZoneId zoneId;
+
+    private SqlStreamInput(StreamInput input, NamedWriteableRegistry namedWriteableRegistry, Version version) throws IOException {
+        super(input, namedWriteableRegistry);
 
-        // version check first
-        Version ver = Version.readVersion(delegate);
-        if (version.compareTo(ver) != 0) {
-            throw new SqlIllegalArgumentException("Unsupported cursor version [{}], expected [{}]", ver, version);
-        }
         delegate.setVersion(version);
-        // configuration settings
         zoneId = delegate.readZoneId();
     }
 

+ 21 - 9
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/common/io/SqlStreamOutput.java

@@ -8,35 +8,47 @@
 package org.elasticsearch.xpack.sql.common.io;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.common.compress.CompressorFactory;
 import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.time.ZoneId;
 import java.util.Base64;
 
+/**
+ * Output stream for writing SQL cursors. The output is compressed if it would become larger than {@code compressionThreshold}
+ * bytes otherwise (see {@code DEFAULT_COMPRESSION_THRESHOLD}).
+ *
+ * The wire format is {@code version compressedPayload}.
+ */
 public class SqlStreamOutput extends OutputStreamStreamOutput {
 
     private final ByteArrayOutputStream bytes;
 
-    public SqlStreamOutput(Version version, ZoneId zoneId) throws IOException {
-        this(new ByteArrayOutputStream(), version, zoneId);
+    public static SqlStreamOutput create(Version version, ZoneId zoneId) throws IOException {
+        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+        StreamOutput uncompressedOut = new OutputStreamStreamOutput(Base64.getEncoder().wrap(bytes));
+        Version.writeVersion(version, uncompressedOut);
+        OutputStream out = CompressorFactory.COMPRESSOR.threadLocalOutputStream(uncompressedOut);
+        return new SqlStreamOutput(bytes, out, version, zoneId);
     }
 
-    private SqlStreamOutput(ByteArrayOutputStream bytes, Version version, ZoneId zoneId) throws IOException {
-        super(Base64.getEncoder().wrap(new OutputStreamStreamOutput(bytes)));
+    private SqlStreamOutput(ByteArrayOutputStream bytes, OutputStream out, Version version, ZoneId zoneId) throws IOException {
+        super(out);
         this.bytes = bytes;
-
-        Version.writeVersion(version, this);
-        writeZoneId(zoneId);
+        super.setVersion(version);
+        this.writeZoneId(zoneId);
     }
 
     /**
      * Should be called _after_ closing the stream - there are no guarantees otherwise.
      */
-    public String streamAsString() {
-        // Base64 uses this encoding instead of UTF-8
+    public String streamAsString() throws IOException {
         return bytes.toString(StandardCharsets.ISO_8859_1);
     }
+
 }

+ 2 - 2
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java

@@ -74,7 +74,7 @@ public final class Cursors {
         if (info == Cursor.EMPTY) {
             return StringUtils.EMPTY;
         }
-        try (SqlStreamOutput output = new SqlStreamOutput(version, zoneId)) {
+        try (SqlStreamOutput output = SqlStreamOutput.create(version, zoneId)) {
             output.writeNamedWriteable(info);
             output.close();
             // return the string only after closing the resource
@@ -91,7 +91,7 @@ public final class Cursors {
         if (base64.isEmpty()) {
             return new Tuple<>(Cursor.EMPTY, null);
         }
-        try (SqlStreamInput in = new SqlStreamInput(base64, WRITEABLE_REGISTRY, VERSION)) {
+        try (SqlStreamInput in = SqlStreamInput.fromString(base64, WRITEABLE_REGISTRY, VERSION)) {
             Cursor cursor = in.readNamedWriteable(Cursor.class);
             return new Tuple<>(cursor, in.zoneId());
         } catch (IOException ex) {

+ 6 - 9
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/AbstractSqlWireSerializingTestCase.java

@@ -8,7 +8,6 @@
 package org.elasticsearch.xpack.sql;
 
 import org.elasticsearch.Version;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.test.AbstractWireTestCase;
@@ -23,14 +22,12 @@ public abstract class AbstractSqlWireSerializingTestCase<T extends Writeable> ex
 
     @Override
     protected T copyInstance(T instance, Version version) throws IOException {
-        try (BytesStreamOutput output = new BytesStreamOutput()) {
-            ZoneId zoneId = instanceZoneId(instance);
-            SqlStreamOutput out = new SqlStreamOutput(version, zoneId);
-            instance.writeTo(out);
-            out.close();
-            try (SqlStreamInput in = new SqlStreamInput(out.streamAsString(), getNamedWriteableRegistry(), version)) {
-                return instanceReader().read(in);
-            }
+        ZoneId zoneId = instanceZoneId(instance);
+        SqlStreamOutput out = SqlStreamOutput.create(version, zoneId);
+        instance.writeTo(out);
+        out.close();
+        try (SqlStreamInput in = SqlStreamInput.fromString(out.streamAsString(), getNamedWriteableRegistry(), version)) {
+            return instanceReader().read(in);
         }
     }
 

+ 83 - 0
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/common/io/SqlStreamTests.java

@@ -0,0 +1,83 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.sql.common.io;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.Version;
+import org.elasticsearch.common.io.stream.InputStreamStreamInput;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.List;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.lessThan;
+
+public class SqlStreamTests extends ESTestCase {
+
+    public void testWriteAndRead() throws IOException {
+        BytesRef payload = new BytesRef(randomByteArrayOfLength(randomIntBetween(10, 1000)));
+
+        SqlStreamOutput out = SqlStreamOutput.create(Version.CURRENT, randomZone());
+        out.writeBytesRef(payload);
+        out.close();
+        String encoded = out.streamAsString();
+
+        SqlStreamInput in = SqlStreamInput.fromString(encoded, new NamedWriteableRegistry(List.of()), Version.CURRENT);
+        BytesRef read = in.readBytesRef();
+
+        assertArrayEquals(payload.bytes, read.bytes);
+    }
+
+    public void testPayloadIsCompressed() throws IOException {
+        SqlStreamOutput out = SqlStreamOutput.create(Version.CURRENT, randomZone());
+        byte[] payload = new byte[1000];
+        Arrays.fill(payload, (byte) 0);
+        out.write(payload);
+        out.close();
+
+        String result = out.streamAsString();
+        assertThat(result.length(), lessThan(1000));
+    }
+
+    public void testOldCursorProducesVersionMismatchError() {
+        SqlIllegalArgumentException ex = expectThrows(
+            SqlIllegalArgumentException.class,
+            () -> SqlStreamInput.fromString(
+                // some cursor produced by ES 7.15.1
+                "97S0AwFaAWMBCHRlc3RfZW1whgEBAQljb21wb3NpdGUHZ3JvdXBieQEDbWF4CDJkMTBjNGJhAAD/AQls"
+                    + "YW5ndWFnZXMAAAD/AAD/AQAIYmRlZjg4ZTUBBmdlbmRlcgAAAQAAAQEKAQhiZGVmODhlNf8AAgEAAAAA"
+                    + "AP////8PAAAAAAAAAAAAAAAAAVoDAAICAAAAAAAAAAAKAP////8PAgFtCDJkMTBjNGJhBXZhbHVlAAEE"
+                    + "QllURQFrCGJkZWY4OGU1AAABAwA=",
+                new NamedWriteableRegistry(List.of()),
+                Version.V_8_2_0
+            )
+        );
+
+        assertThat(ex.getMessage(), containsString("Unsupported cursor version [7.15.1], expected [8.2.0]"));
+    }
+
+    public void testVersionCanBeReadByOldNodes() throws IOException {
+        Version version = randomFrom(Version.V_7_0_0, Version.V_7_2_1, Version.V_8_1_0);
+        SqlStreamOutput out = SqlStreamOutput.create(version, randomZone());
+        out.writeString("payload");
+        out.close();
+        String encoded = out.streamAsString();
+
+        byte[] bytes = Base64.getDecoder().decode(encoded);
+        InputStreamStreamInput in = new InputStreamStreamInput(new ByteArrayInputStream(bytes));
+
+        assertEquals(version, Version.readVersion(in));
+    }
+
+}

+ 1 - 1
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java

@@ -30,7 +30,7 @@ public class CompositeAggregationCursorTests extends AbstractSqlWireSerializingT
         }
 
         return new CompositeAggCursor(
-            new byte[randomInt(256)],
+            new byte[randomInt(1024)],
             extractors,
             randomBitSet(extractorsSize),
             randomIntBetween(10, 1024),

+ 1 - 1
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java

@@ -19,7 +19,7 @@ import java.util.List;
 
 public class ListCursorTests extends AbstractSqlWireSerializingTestCase<ListCursor> {
     public static ListCursor randomPagingListCursor() {
-        int size = between(1, 20);
+        int size = between(1, 100);
         int depth = between(1, 20);
 
         List<List<?>> values = new ArrayList<>(size);