Selaa lähdekoodia

Deduplicate created objects when deserializing InternalAggregations in SearchResponse (#124296)

It should help in case of cross cluster search.
Ignacio Vera 7 kuukautta sitten
vanhempi
commit
9042201187

+ 10 - 1
server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

@@ -15,6 +15,7 @@ import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.common.io.stream.DelayableWriteable;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -92,7 +93,15 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
     public SearchResponse(StreamInput in) throws IOException {
         super(in);
         this.hits = SearchHits.readFrom(in, true);
-        this.aggregations = in.readBoolean() ? InternalAggregations.readFrom(in) : null;
+        if (in.readBoolean()) {
+            // deserialize the aggregations trying to deduplicate the object created
+            // TODO: use DelayableWriteable instead.
+            this.aggregations = InternalAggregations.readFrom(
+                DelayableWriteable.wrapWithDeduplicatorStreamInput(in, in.getTransportVersion(), in.namedWriteableRegistry())
+            );
+        } else {
+            this.aggregations = null;
+        }
         this.suggest = in.readBoolean() ? new Suggest(in) : null;
         this.timedOut = in.readBoolean();
         this.terminatedEarly = in.readOptionalBoolean();

+ 16 - 7
server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java

@@ -12,6 +12,7 @@ package org.elasticsearch.common.io.stream;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
 
 import java.io.IOException;
@@ -231,16 +232,24 @@ public abstract class DelayableWriteable<T extends Writeable> implements Writeab
         NamedWriteableRegistry registry,
         BytesReference serialized
     ) throws IOException {
-        try (
-            StreamInput in = registry == null
-                ? new DeduplicateStreamInput(serialized.streamInput(), new DeduplicatorCache())
-                : new DeduplicateNamedWriteableAwareStreamInput(serialized.streamInput(), registry, new DeduplicatorCache())
-        ) {
-            in.setTransportVersion(serializedAtVersion);
-            return reader.read(in);
+        try (StreamInput in = serialized.streamInput()) {
+            return reader.read(wrapWithDeduplicatorStreamInput(in, serializedAtVersion, registry));
         }
     }
 
+    /** Wraps the provided {@link StreamInput} with another stream that extends {@link Deduplicator} */
+    public static StreamInput wrapWithDeduplicatorStreamInput(
+        StreamInput in,
+        TransportVersion serializedAtVersion,
+        @Nullable NamedWriteableRegistry registry
+    ) {
+        StreamInput out = registry == null
+            ? new DeduplicateStreamInput(in, new DeduplicatorCache())
+            : new DeduplicateNamedWriteableAwareStreamInput(in, registry, new DeduplicatorCache());
+        out.setTransportVersion(serializedAtVersion);
+        return out;
+    }
+
     /** An object implementing this interface can deduplicate instance of the provided objects.*/
     public interface Deduplicator {
         <T> T deduplicate(T object);