瀏覽代碼

Stop Redundantly Serializing ShardId in BulkShardResponse (#56094)

When reading/writing the individual doc responses in the context
of a bulk shard response there is no need to serialize the `ShardId`
over and over. This can waste a lot of memory when handling large bulk
requests.
Armin Braun 5 年之前
父節點
當前提交
c0ee6d2d52

+ 28 - 0
server/src/main/java/org/elasticsearch/action/DocWriteResponse.java

@@ -131,6 +131,25 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
     }
 
     // needed for deserialization
+    protected DocWriteResponse(ShardId shardId, StreamInput in) throws IOException {
+        super(in);
+        this.shardId = shardId;
+        if (in.getVersion().before(Version.V_8_0_0)) {
+            String type = in.readString();
+            assert MapperService.SINGLE_MAPPING_NAME.equals(type) : "Expected [_doc] but received [" + type + "]";
+        }
+        id = in.readString();
+        version = in.readZLong();
+        seqNo = in.readZLong();
+        primaryTerm = in.readVLong();
+        forcedRefresh = in.readBoolean();
+        result = Result.readFrom(in);
+    }
+
+    /**
+     * Needed for deserialization of single item requests in {@link org.elasticsearch.action.index.IndexAction} and BwC
+     * deserialization path
+     */
     protected DocWriteResponse(StreamInput in) throws IOException {
         super(in);
         shardId = new ShardId(in);
@@ -258,10 +277,19 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
         return location.toString();
     }
 
+    public void writeThin(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        writeWithoutShardId(out);
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
         shardId.writeTo(out);
+        writeWithoutShardId(out);
+    }
+
+    private void writeWithoutShardId(StreamOutput out) throws IOException {
         if (out.getVersion().before(Version.V_8_0_0)) {
             out.writeString(MapperService.SINGLE_MAPPING_NAME);
         }

+ 54 - 7
server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java

@@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.seqno.SequenceNumbers;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.rest.RestStatus;
 
 import java.io.IOException;
@@ -359,6 +360,26 @@ public class BulkItemResponse implements Writeable, StatusToXContentObject {
 
     BulkItemResponse() {}
 
+    BulkItemResponse(ShardId shardId, StreamInput in) throws IOException {
+        id = in.readVInt();
+        opType = OpType.fromId(in.readByte());
+
+        byte type = in.readByte();
+        if (type == 0) {
+            response = new IndexResponse(shardId, in);
+        } else if (type == 1) {
+            response = new DeleteResponse(shardId, in);
+        } else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
+            response = new UpdateResponse(shardId, in);
+        } else if (type != 2) {
+            throw new IllegalArgumentException("Unexpected type [" + type + "]");
+        }
+
+        if (in.readBoolean()) {
+            failure = new Failure(in);
+        }
+    }
+
     BulkItemResponse(StreamInput in) throws IOException {
         id = in.readVInt();
         opType = OpType.fromId(in.readByte());
@@ -370,6 +391,8 @@ public class BulkItemResponse implements Writeable, StatusToXContentObject {
             response = new DeleteResponse(in);
         } else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
             response = new UpdateResponse(in);
+        } else if (type != 2) {
+            throw new IllegalArgumentException("Unexpected type [" + type + "]");
         }
 
         if (in.readBoolean()) {
@@ -473,13 +496,7 @@ public class BulkItemResponse implements Writeable, StatusToXContentObject {
         if (response == null) {
             out.writeByte((byte) 2);
         } else {
-            if (response instanceof IndexResponse) {
-                out.writeByte((byte) 0);
-            } else if (response instanceof DeleteResponse) {
-                out.writeByte((byte) 1);
-            } else if (response instanceof UpdateResponse) {
-                out.writeByte((byte) 3); // make 3 instead of 2, because 2 is already in use for 'no responses'
-            }
+            writeResponseType(out);
             response.writeTo(out);
         }
         if (failure == null) {
@@ -489,4 +506,34 @@ public class BulkItemResponse implements Writeable, StatusToXContentObject {
             failure.writeTo(out);
         }
     }
+
+    public void writeThin(StreamOutput out) throws IOException {
+        out.writeVInt(id);
+        out.writeByte(opType.getId());
+
+        if (response == null) {
+            out.writeByte((byte) 2);
+        } else {
+            writeResponseType(out);
+            response.writeThin(out);
+        }
+        if (failure == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            failure.writeTo(out);
+        }
+    }
+
+    private void writeResponseType(StreamOutput out) throws IOException {
+        if (response instanceof IndexResponse) {
+            out.writeByte((byte) 0);
+        } else if (response instanceof DeleteResponse) {
+            out.writeByte((byte) 1);
+        } else if (response instanceof UpdateResponse) {
+            out.writeByte((byte) 3); // make 3 instead of 2, because 2 is already in use for 'no responses'
+        } else {
+            throw new IllegalStateException("Unexpected response type found [" + response.getClass() + "]");
+        }
+    }
 }

+ 19 - 4
server/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.bulk;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.support.WriteResponse;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -30,6 +31,8 @@ import java.io.IOException;
 
 public class BulkShardResponse extends ReplicationResponse implements WriteResponse {
 
+    private static final Version COMPACT_SHARD_ID_VERSION = Version.V_8_0_0;
+
     private final ShardId shardId;
     private final BulkItemResponse[] responses;
 
@@ -37,8 +40,14 @@ public class BulkShardResponse extends ReplicationResponse implements WriteRespo
         super(in);
         shardId = new ShardId(in);
         responses = new BulkItemResponse[in.readVInt()];
-        for (int i = 0; i < responses.length; i++) {
-            responses[i] = new BulkItemResponse(in);
+        if (in.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION)) {
+            for (int i = 0; i < responses.length; i++) {
+                responses[i] = new BulkItemResponse(shardId, in);
+            }
+        } else {
+            for (int i = 0; i < responses.length; i++) {
+                responses[i] = new BulkItemResponse(in);
+            }
         }
     }
 
@@ -75,8 +84,14 @@ public class BulkShardResponse extends ReplicationResponse implements WriteRespo
         super.writeTo(out);
         shardId.writeTo(out);
         out.writeVInt(responses.length);
-        for (BulkItemResponse response : responses) {
-            response.writeTo(out);
+        if (out.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION)) {
+            for (BulkItemResponse response : responses) {
+                response.writeThin(out);
+            }
+        } else {
+            for (BulkItemResponse response : responses) {
+                response.writeTo(out);
+            }
         }
     }
 }

+ 4 - 0
server/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java

@@ -37,6 +37,10 @@ import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpect
  */
 public class DeleteResponse extends DocWriteResponse {
 
+    public DeleteResponse(ShardId shardId, StreamInput in) throws IOException {
+        super(shardId, in);
+    }
+
     public DeleteResponse(StreamInput in) throws IOException {
         super(in);
     }

+ 4 - 0
server/src/main/java/org/elasticsearch/action/index/IndexResponse.java

@@ -38,6 +38,10 @@ import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpect
  */
 public class IndexResponse extends DocWriteResponse {
 
+    public IndexResponse(ShardId shardId, StreamInput in) throws IOException {
+        super(shardId, in);
+    }
+
     public IndexResponse(StreamInput in) throws IOException {
         super(in);
     }

+ 17 - 0
server/src/main/java/org/elasticsearch/action/update/UpdateResponse.java

@@ -38,6 +38,13 @@ public class UpdateResponse extends DocWriteResponse {
 
     private GetResult getResult;
 
+    public UpdateResponse(ShardId shardId, StreamInput in) throws IOException {
+        super(shardId, in);
+        if (in.readBoolean()) {
+            getResult = new GetResult(in);
+        }
+    }
+
     public UpdateResponse(StreamInput in) throws IOException {
         super(in);
         if (in.readBoolean()) {
@@ -72,9 +79,19 @@ public class UpdateResponse extends DocWriteResponse {
         return this.result == Result.CREATED ? RestStatus.CREATED : super.status();
     }
 
+    @Override
+    public void writeThin(StreamOutput out) throws IOException {
+        super.writeThin(out);
+        writeGetResult(out);
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
+        writeGetResult(out);
+    }
+
+    private void writeGetResult(StreamOutput out) throws IOException {
         if (getResult == null) {
             out.writeBoolean(false);
         } else {