Browse Source

Stream input and output support for optional collections (#88127)

This PR adds support for reading and writing optional collections via
StreamInput and StreamOutput.
Nikolaj Volgushev 3 năm trước cách đây
mục cha
commit
b610f32cea

+ 5 - 0
docs/changelog/88127.yaml

@@ -0,0 +1,5 @@
+pr: 88127
+summary: Stream input and output support for optional collections
+area: Infra/Core
+type: enhancement
+issues: []

+ 11 - 6
server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

@@ -1185,6 +1185,16 @@ public abstract class StreamInput extends InputStream {
         return readList(StreamInput::readString);
     }
 
+    /**
+     * Reads an optional list. The list is expected to have been written using
+     * {@link StreamOutput#writeOptionalCollection(Collection)}. If the returned list contains any entries it will be mutable.
+     * If it is empty it might be immutable.
+     */
+    public <T> List<T> readOptionalList(final Writeable.Reader<T> reader) throws IOException {
+        final boolean isPresent = readBoolean();
+        return isPresent ? readList(reader) : null;
+    }
+
     /**
      * Reads an optional list of strings. The list is expected to have been written using
      * {@link StreamOutput#writeOptionalStringCollection(Collection)}. If the returned list contains any entries it will be mutable.
@@ -1194,12 +1204,7 @@ public abstract class StreamInput extends InputStream {
      * @throws IOException if an I/O exception occurs reading the list
      */
     public List<String> readOptionalStringList() throws IOException {
-        final boolean isPresent = readBoolean();
-        if (isPresent) {
-            return readList(StreamInput::readString);
-        } else {
-            return null;
-        }
+        return readOptionalList(StreamInput::readString);
     }
 
     /**

+ 22 - 6
server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

@@ -1172,6 +1172,27 @@ public abstract class StreamOutput extends OutputStream {
         writeCollection(collection, StreamOutput::writeString);
     }
 
+    /**
+     * Writes an optional collection. The corresponding collection can be read from a stream input using
+     * {@link StreamInput#readOptionalList(Writeable.Reader)}.
+     */
+    public <T extends Writeable> void writeOptionalCollection(final Collection<T> collection) throws IOException {
+        writeOptionalCollection(collection, (o, v) -> v.writeTo(o));
+    }
+
+    /**
+     * Writes an optional collection via {@link Writer}. The corresponding collection can be read from a stream input using
+     * {@link StreamInput#readOptionalList(Writeable.Reader)}.
+     */
+    public <T> void writeOptionalCollection(final Collection<T> collection, final Writer<T> writer) throws IOException {
+        if (collection != null) {
+            writeBoolean(true);
+            writeCollection(collection, writer);
+        } else {
+            writeBoolean(false);
+        }
+    }
+
     /**
      * Writes an optional collection of a strings. The corresponding collection can be read from a stream input using
      * {@link StreamInput#readList(Writeable.Reader)}.
@@ -1180,12 +1201,7 @@ public abstract class StreamOutput extends OutputStream {
      * @throws IOException if an I/O exception occurs writing the collection
      */
     public void writeOptionalStringCollection(final Collection<String> collection) throws IOException {
-        if (collection != null) {
-            writeBoolean(true);
-            writeCollection(collection, StreamOutput::writeString);
-        } else {
-            writeBoolean(false);
-        }
+        writeOptionalCollection(collection, StreamOutput::writeString);
     }
 
     /**

+ 32 - 0
server/src/test/java/org/elasticsearch/common/io/stream/AbstractStreamTests.java

@@ -256,12 +256,32 @@ public abstract class AbstractStreamTests extends ESTestCase {
             StreamOutput::writeCollection,
             in -> in.readList(FooBar::new)
         );
+
+        runWriteReadCollectionTest(
+            () -> new FooBar(randomInt(), randomInt()),
+            StreamOutput::writeOptionalCollection,
+            in -> in.readOptionalList(FooBar::new)
+        );
+
+        runWriteReadOptionalCollectionWithNullInput(out -> out.writeOptionalCollection(null), in -> in.readOptionalList(FooBar::new));
     }
 
     public void testStringCollection() throws IOException {
         runWriteReadCollectionTest(() -> randomUnicodeOfLength(16), StreamOutput::writeStringCollection, StreamInput::readStringList);
     }
 
+    public void testOptionalStringCollection() throws IOException {
+        runWriteReadCollectionTest(
+            () -> randomUnicodeOfLength(16),
+            StreamOutput::writeOptionalStringCollection,
+            StreamInput::readOptionalStringList
+        );
+    }
+
+    public void testOptionalStringCollectionWithNullInput() throws IOException {
+        runWriteReadOptionalCollectionWithNullInput(out -> out.writeOptionalStringCollection(null), StreamInput::readOptionalStringList);
+    }
+
     private <T> void runWriteReadCollectionTest(
         final Supplier<T> supplier,
         final CheckedBiConsumer<StreamOutput, Collection<T>, IOException> writer,
@@ -280,6 +300,18 @@ public abstract class AbstractStreamTests extends ESTestCase {
         }
     }
 
+    private <T> void runWriteReadOptionalCollectionWithNullInput(
+        final CheckedConsumer<StreamOutput, IOException> nullWriter,
+        final CheckedFunction<StreamInput, Collection<T>, IOException> reader
+    ) throws IOException {
+        try (BytesStreamOutput out = new BytesStreamOutput()) {
+            nullWriter.accept(out);
+            try (StreamInput in = getStreamInput(out.bytes())) {
+                assertNull(reader.apply(in));
+            }
+        }
+    }
+
     public void testSetOfLongs() throws IOException {
         final int size = randomIntBetween(0, 6);
         final Set<Long> sourceSet = Sets.newHashSetWithExpectedSize(size);