1
0
Эх сурвалжийг харах

Allow build XContent directly from Writable (#73804)

Today, writing a Writable value to XContent in Base64 format performs 
these steps: (1) create a BytesStreamOutput, (2) write Writable to that
output, (3) encode a copy of bytes from that output stream, (4) create a
string from the encoded bytes, (5) write the encoded string to XContent. 
These steps allocate/use memory 5 times than writing the encode chars
directly to the output of XContent.

This API would help reduce memory usage when storing a large response 
of an async search.

Relates #67594
Nhat Nguyen 4 жил өмнө
parent
commit
cb1144886f

+ 29 - 0
libs/x-content/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java

@@ -8,10 +8,12 @@
 
 package org.elasticsearch.common.xcontent;
 
+import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.RestApiVersion;
 
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
+import java.io.FilterOutputStream;
 import java.io.Flushable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -21,6 +23,7 @@ import java.math.BigInteger;
 import java.nio.file.Path;
 import java.time.ZonedDateTime;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
@@ -1047,6 +1050,32 @@ public final class XContentBuilder implements Closeable, Flushable {
         return this;
     }
 
+    /**
+     * Write the content that is written to the output stream by the {@code writer} as a string encoded in Base64 format.
+     * This API can be used to generate XContent directly without the intermediate results to reduce memory usage.
+     * Note that this method supports only JSON.
+     */
+    public XContentBuilder directFieldAsBase64(String name, CheckedConsumer<OutputStream, IOException> writer) throws IOException {
+        if (contentType() != XContentType.JSON) {
+            assert false : "writableFieldAsBase64 supports only JSON format";
+            throw new UnsupportedOperationException("writableFieldAsBase64 supports only JSON format");
+        }
+        generator.writeDirectField(name, os -> {
+            os.write('\"');
+            final FilterOutputStream noClose = new FilterOutputStream(os) {
+                @Override
+                public void close() {
+                    // We need to close the output stream that is wrapped by a Base64 encoder to flush the outstanding buffer
+                    // of the encoder, but we must not close the underlying output stream of the XContentBuilder.
+                }
+            };
+            final OutputStream encodedOutput = Base64.getEncoder().wrap(noClose);
+            writer.accept(encodedOutput);
+            encodedOutput.close(); // close to flush the outstanding buffer used in the Base64 Encoder
+            os.write('\"');
+        });
+        return this;
+    }
 
     /**
      * Returns a version used for serialising a response.

+ 9 - 0
libs/x-content/src/main/java/org/elasticsearch/common/xcontent/XContentGenerator.java

@@ -8,10 +8,13 @@
 
 package org.elasticsearch.common.xcontent;
 
+import org.elasticsearch.common.CheckedConsumer;
+
 import java.io.Closeable;
 import java.io.Flushable;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 
@@ -102,6 +105,12 @@ public interface XContentGenerator extends Closeable, Flushable {
 
     void copyCurrentStructure(XContentParser parser) throws IOException;
 
+    /**
+     * Write a field whose value is written directly to the output stream. As the content is copied as is,
+     * the writer must a valid XContent value (e.g., string is properly escaped and quoted)
+     */
+    void writeDirectField(String name, CheckedConsumer<OutputStream, IOException> writer) throws IOException;
+
     default void copyCurrentEvent(XContentParser parser) throws IOException {
         switch (parser.currentToken()) {
             case START_OBJECT:

+ 10 - 0
libs/x-content/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java

@@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.json.JsonWriteContext;
 import com.fasterxml.jackson.core.util.DefaultIndenter;
 import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
 import com.fasterxml.jackson.core.util.JsonGeneratorDelegate;
+import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.xcontent.DeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContent;
@@ -430,6 +431,15 @@ public class JsonXContentGenerator implements XContentGenerator {
         }
     }
 
+    @Override
+    public void writeDirectField(String name, CheckedConsumer<OutputStream, IOException> writer) throws IOException {
+        writeStartRaw(name);
+        flush();
+        writer.accept(os);
+        flush();
+        writeEndRaw();
+    }
+
     @Override
     public void flush() throws IOException {
         generator.flush();

+ 89 - 0
server/src/test/java/org/elasticsearch/common/xcontent/builder/XContentBuilderTests.java

@@ -19,6 +19,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentGenerator;
+import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
@@ -26,9 +27,13 @@ import org.elasticsearch.test.ESTestCase;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
@@ -37,9 +42,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.TimeZone;
 
+import static org.hamcrest.Matchers.aMapWithSize;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.instanceOf;
 
 public class XContentBuilderTests extends ESTestCase {
     public void testPrettyWithLfAtEnd() throws Exception {
@@ -383,4 +392,84 @@ public class XContentBuilderTests extends ESTestCase {
         assertThat(e.getMessage(), equalTo("Failed to close the XContentBuilder"));
         assertThat(e.getCause().getMessage(), equalTo("Unclosed object or array found"));
     }
+
+    private static class TestWritableValue {
+        final Map<String, Byte> values;
+
+        static TestWritableValue randomValue() {
+            int numKeys = randomIntBetween(0, 10);
+            Map<String, Byte> values = new HashMap<>();
+            for (int i = 0; i < numKeys; i++) {
+                values.put(randomAlphaOfLength(10), randomByte());
+            }
+            return new TestWritableValue(values);
+        }
+
+        TestWritableValue(Map<String, Byte> values) {
+            this.values = values;
+        }
+
+        TestWritableValue(InputStream in) throws IOException {
+            final int size = in.read();
+            this.values = new HashMap<>(size);
+            for (int i = 0; i < size; i++) {
+                final int keySize = in.read();
+                final String key = new String(in.readNBytes(keySize), StandardCharsets.ISO_8859_1);
+                final byte value = (byte) in.read();
+                values.put(key, value);
+            }
+        }
+
+        public void writeTo(OutputStream os) throws IOException {
+            os.write((byte) values.size());
+            for (Map.Entry<String, Byte> e : values.entrySet()) {
+                final String k = e.getKey();
+                os.write((byte) k.length());
+                os.write(k.getBytes(StandardCharsets.ISO_8859_1));
+                os.write(e.getValue());
+            }
+        }
+    }
+
+    public void testWritableValue() throws Exception {
+        Map<String, Object> expectedValues = new HashMap<>();
+        final XContentBuilder builder = XContentFactory.jsonBuilder();
+        builder.startObject();
+        int fields = iterations(1, 10);
+        for (int i = 0; i < fields; i++) {
+            String field = "field-" + i;
+            if (randomBoolean()) {
+                final TestWritableValue value = TestWritableValue.randomValue();
+                builder.directFieldAsBase64(field, value::writeTo);
+                expectedValues.put(field, value);
+            } else {
+                Object value = randomFrom(randomInt(), randomAlphaOfLength(10));
+                builder.field(field, value);
+                expectedValues.put(field, value);
+            }
+        }
+        builder.endObject();
+        final BytesReference bytes = BytesReference.bytes(builder);
+        final Map<String, Object> actualValues = XContentHelper.convertToMap(bytes, true).v2();
+        assertThat(actualValues, aMapWithSize(fields));
+        for (Map.Entry<String, Object> e : expectedValues.entrySet()) {
+            if (e.getValue() instanceof TestWritableValue) {
+                final TestWritableValue expectedValue = (TestWritableValue) e.getValue();
+                assertThat(actualValues.get(e.getKey()), instanceOf(String.class));
+                final byte[] decoded = Base64.getDecoder().decode((String) actualValues.get(e.getKey()));
+                final TestWritableValue actualValue = new TestWritableValue(new InputStream() {
+                    int pos = 0;
+
+                    @Override
+                    public int read() {
+                        Objects.checkIndex(pos, decoded.length);
+                        return decoded[pos++];
+                    }
+                });
+                assertThat(actualValue.values, equalTo(expectedValue.values));
+            } else {
+                assertThat(actualValues, hasEntry(e.getKey(), e.getValue()));
+            }
+        }
+    }
 }