Răsfoiți Sursa

Simplify reading a list and converting it to a map from stream (#84183)

This commit adds readMapFromList to StreamInput. This method allows to
efficiently read list from the stream and create a presized map from it
without having to allocate intermediate collection.
Ievgen Degtiarenko 3 ani în urmă
părinte
comite
76691bc682

+ 5 - 0
docs/changelog/84183.yaml

@@ -0,0 +1,5 @@
+pr: 84183
+summary: Simplify reading a list and converting it to a map from stream
+area: Infra/Core
+type: enhancement
+issues: []

+ 2 - 8
server/src/main/java/org/elasticsearch/cluster/health/ClusterIndexHealth.java

@@ -171,13 +171,7 @@ public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, W
         initializingShards = in.readVInt();
         unassignedShards = in.readVInt();
         status = ClusterHealthStatus.readFrom(in);
-
-        int size = in.readVInt();
-        shards = Maps.newMapWithExpectedSize(size);
-        for (int i = 0; i < size; i++) {
-            ClusterShardHealth shardHealth = new ClusterShardHealth(in);
-            shards.put(shardHealth.getShardId(), shardHealth);
-        }
+        shards = in.readMapValues(ClusterShardHealth::new, ClusterShardHealth::getShardId);
     }
 
     /**
@@ -263,7 +257,7 @@ public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, W
         out.writeVInt(initializingShards);
         out.writeVInt(unassignedShards);
         out.writeByte(status.value());
-        out.writeCollection(shards.values());
+        out.writeMapValues(shards);
     }
 
     @Override

+ 2 - 11
server/src/main/java/org/elasticsearch/cluster/health/ClusterStateHealth.java

@@ -14,7 +14,6 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.rest.RestStatus;
 
 import java.io.IOException;
@@ -119,12 +118,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
         numberOfNodes = in.readVInt();
         numberOfDataNodes = in.readVInt();
         status = ClusterHealthStatus.readFrom(in);
-        int size = in.readVInt();
-        indices = Maps.newMapWithExpectedSize(size);
-        for (int i = 0; i < size; i++) {
-            ClusterIndexHealth indexHealth = new ClusterIndexHealth(in);
-            indices.put(indexHealth.getIndex(), indexHealth);
-        }
+        indices = in.readMapValues(ClusterIndexHealth::new, ClusterIndexHealth::getIndex);
         activeShardsPercent = in.readDouble();
     }
 
@@ -210,10 +204,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
         out.writeVInt(numberOfNodes);
         out.writeVInt(numberOfDataNodes);
         out.writeByte(status.value());
-        out.writeVInt(indices.size());
-        for (ClusterIndexHealth indexHealth : this) {
-            indexHealth.writeTo(out);
-        }
+        out.writeMapValues(indices);
         out.writeDouble(activeShardsPercent);
     }
 

+ 3 - 8
server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

@@ -1086,13 +1086,8 @@ public class Metadata extends AbstractCollection<IndexMetadata> implements Diffa
         }
         final Function<String, MappingMetadata> mappingLookup;
         if (in.getVersion().onOrAfter(MAPPINGS_AS_HASH_VERSION)) {
-            final int mappings = in.readVInt();
-            if (mappings > 0) {
-                final Map<String, MappingMetadata> mappingMetadataMap = new HashMap<>(mappings);
-                for (int i = 0; i < mappings; i++) {
-                    final MappingMetadata m = new MappingMetadata(in);
-                    mappingMetadataMap.put(m.getSha256(), m);
-                }
+            final Map<String, MappingMetadata> mappingMetadataMap = in.readMapValues(MappingMetadata::new, MappingMetadata::getSha256);
+            if (mappingMetadataMap.size() > 0) {
                 mappingLookup = mappingMetadataMap::get;
             } else {
                 mappingLookup = null;
@@ -1131,7 +1126,7 @@ public class Metadata extends AbstractCollection<IndexMetadata> implements Diffa
         // Starting in #MAPPINGS_AS_HASH_VERSION we write the mapping metadata first and then write the indices without metadata so that
         // we avoid writing duplicate mappings twice
         if (out.getVersion().onOrAfter(MAPPINGS_AS_HASH_VERSION)) {
-            out.writeCollection(mappingsByHash.values());
+            out.writeMapValues(mappingsByHash);
         }
         out.writeVInt(indices.size());
         final boolean writeMappingsHash = out.getVersion().onOrAfter(MAPPINGS_AS_HASH_VERSION);

+ 21 - 0
server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

@@ -67,6 +67,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.function.IntFunction;
 
 import static org.elasticsearch.ElasticsearchException.readStackTrace;
@@ -652,6 +653,26 @@ public abstract class StreamInput extends InputStream {
         return map;
     }
 
+    /**
+     * Reads a multiple {@code V}-values and then converts them to a {@code Map} using keyMapper.
+     *
+     * @param valueReader The value reader
+     * @param keyMapper function to create a key from a value
+     * @return Never {@code null}.
+     */
+    public <K, V> Map<K, V> readMapValues(final Writeable.Reader<V> valueReader, final Function<V, K> keyMapper) throws IOException {
+        final int size = readArraySize();
+        if (size == 0) {
+            return Map.of();
+        }
+        final Map<K, V> map = Maps.newMapWithExpectedSize(size);
+        for (int i = 0; i < size; i++) {
+            V value = valueReader.read(this);
+            map.put(keyMapper.apply(value), value);
+        }
+        return map;
+    }
+
     /**
      * If the returned map contains any entries it will be mutable. If it is empty it might be immutable.
      */

+ 14 - 0
server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

@@ -545,6 +545,20 @@ public abstract class StreamOutput extends OutputStream {
         }
     }
 
+    /**
+     * Writes values of a map as a collection
+     */
+    public final <V> void writeMapValues(final Map<?, V> map, final Writer<V> valueWriter) throws IOException {
+        writeCollection(map.values(), valueWriter);
+    }
+
+    /**
+     * Writes values of a map as a collection
+     */
+    public final <V extends Writeable> void writeMapValues(final Map<?, V> map) throws IOException {
+        writeMapValues(map, (o, v) -> v.writeTo(o));
+    }
+
     /**
      * Write a {@link Map} of {@code K}-type keys to {@code V}-type {@link List}s.
      * <pre><code>

+ 4 - 31
server/src/main/java/org/elasticsearch/index/get/GetResult.java

@@ -18,7 +18,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.logging.DeprecationLogger;
-import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.index.mapper.IgnoredFieldMapper;
@@ -80,8 +79,8 @@ public class GetResult implements Writeable, Iterable<DocumentField>, ToXContent
             if (source.length() == 0) {
                 source = null;
             }
-            documentFields = readFields(in);
-            metaFields = readFields(in);
+            documentFields = in.readMapValues(DocumentField::new, DocumentField::getName);
+            metaFields = in.readMapValues(DocumentField::new, DocumentField::getName);
         } else {
             metaFields = Collections.emptyMap();
             documentFields = Collections.emptyMap();
@@ -384,21 +383,6 @@ public class GetResult implements Writeable, Iterable<DocumentField>, ToXContent
         return fromXContentEmbedded(parser);
     }
 
-    private Map<String, DocumentField> readFields(StreamInput in) throws IOException {
-        Map<String, DocumentField> fields;
-        int size = in.readVInt();
-        if (size == 0) {
-            fields = emptyMap();
-        } else {
-            fields = Maps.newMapWithExpectedSize(size);
-            for (int i = 0; i < size; i++) {
-                DocumentField field = new DocumentField(in);
-                fields.put(field.getName(), field);
-            }
-        }
-        return fields;
-    }
-
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(index);
@@ -412,19 +396,8 @@ public class GetResult implements Writeable, Iterable<DocumentField>, ToXContent
         out.writeBoolean(exists);
         if (exists) {
             out.writeBytesReference(source);
-            writeFields(out, documentFields);
-            writeFields(out, metaFields);
-        }
-    }
-
-    private void writeFields(StreamOutput out, Map<String, DocumentField> fields) throws IOException {
-        if (fields == null) {
-            out.writeVInt(0);
-        } else {
-            out.writeVInt(fields.size());
-            for (DocumentField field : fields.values()) {
-                field.writeTo(out);
-            }
+            out.writeMapValues(documentFields);
+            out.writeMapValues(metaFields);
         }
     }
 

+ 4 - 17
server/src/main/java/org/elasticsearch/index/store/Store.java

@@ -52,7 +52,6 @@ import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
 import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
-import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.core.AbstractRefCounted;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.RefCounted;
@@ -780,16 +779,11 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
         }
 
         public static MetadataSnapshot readFrom(StreamInput in) throws IOException {
-            final int metadataSize = in.readVInt();
-            final Map<String, StoreFileMetadata> metadata = metadataSize == 0 ? emptyMap() : Maps.newMapWithExpectedSize(metadataSize);
-            for (int i = 0; i < metadataSize; i++) {
-                final var storeFileMetadata = new StoreFileMetadata(in);
-                metadata.put(storeFileMetadata.name(), storeFileMetadata);
-            }
+            final Map<String, StoreFileMetadata> metadata = in.readMapValues(StoreFileMetadata::new, StoreFileMetadata::name);
             final var commitUserData = in.readMap(StreamInput::readString, StreamInput::readString);
             final var numDocs = in.readLong();
 
-            if (metadataSize == 0 && commitUserData.size() == 0 && numDocs == 0) {
+            if (metadata.size() == 0 && commitUserData.size() == 0 && numDocs == 0) {
                 return MetadataSnapshot.EMPTY;
             } else {
                 return new MetadataSnapshot(metadata, commitUserData, numDocs);
@@ -798,15 +792,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
-            out.writeVInt(this.metadata.size());
-            for (StoreFileMetadata meta : this) {
-                meta.writeTo(out);
-            }
-            out.writeVInt(commitUserData.size());
-            for (Map.Entry<String, String> entry : commitUserData.entrySet()) {
-                out.writeString(entry.getKey());
-                out.writeString(entry.getValue());
-            }
+            out.writeMapValues(metadata);
+            out.writeMap(commitUserData, StreamOutput::writeString, StreamOutput::writeString);
             out.writeLong(numDocs);
         }
 

+ 18 - 1
server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java

@@ -498,7 +498,7 @@ public class BytesStreamsTests extends ESTestCase {
 
     public void testWriteMap() throws IOException {
         final int size = randomIntBetween(0, 100);
-        final Map<String, String> expected = Maps.newMapWithExpectedSize(randomIntBetween(0, 100));
+        final Map<String, String> expected = Maps.newMapWithExpectedSize(size);
         for (int i = 0; i < size; ++i) {
             expected.put(randomAlphaOfLength(2), randomAlphaOfLength(5));
         }
@@ -586,6 +586,23 @@ public class BytesStreamsTests extends ESTestCase {
         out.close();
     }
 
+    public void testWriteMapAsList() throws IOException {
+        final int size = randomIntBetween(0, 100);
+        final Map<String, String> expected = Maps.newMapWithExpectedSize(size);
+        for (int i = 0; i < size; ++i) {
+            final String value = randomAlphaOfLength(5);
+            expected.put("key_" + value, value);
+        }
+
+        final BytesStreamOutput out = new BytesStreamOutput();
+        out.writeMapValues(expected, StreamOutput::writeString);
+        final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes()));
+        final Map<String, String> loaded = in.readMapValues(StreamInput::readString, value -> "key_" + value);
+
+        assertThat(loaded.size(), equalTo(expected.size()));
+        assertThat(expected, equalTo(loaded));
+    }
+
     private abstract static class BaseNamedWriteable implements NamedWriteable {
 
     }

+ 18 - 1
server/src/test/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutputTests.java

@@ -503,7 +503,7 @@ public class RecyclerBytesStreamOutputTests extends ESTestCase {
 
     public void testWriteMap() throws IOException {
         final int size = randomIntBetween(0, 100);
-        final Map<String, String> expected = Maps.newMapWithExpectedSize(randomIntBetween(0, 100));
+        final Map<String, String> expected = Maps.newMapWithExpectedSize(size);
         for (int i = 0; i < size; ++i) {
             expected.put(randomAlphaOfLength(2), randomAlphaOfLength(5));
         }
@@ -591,6 +591,23 @@ public class RecyclerBytesStreamOutputTests extends ESTestCase {
         out.close();
     }
 
+    public void testWriteMapAsList() throws IOException {
+        final int size = randomIntBetween(0, 100);
+        final Map<String, String> expected = Maps.newMapWithExpectedSize(size);
+        for (int i = 0; i < size; ++i) {
+            final String value = randomAlphaOfLength(5);
+            expected.put("key_" + value, value);
+        }
+
+        final RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+        out.writeMapValues(expected, StreamOutput::writeString);
+        final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes()));
+        final Map<String, String> loaded = in.readMapValues(StreamInput::readString, value -> "key_" + value);
+
+        assertThat(loaded.size(), equalTo(expected.size()));
+        assertThat(expected, equalTo(loaded));
+    }
+
     private abstract static class BaseNamedWriteable implements NamedWriteable {
 
     }