Browse Source

Introduce BytesTransportRequest, allowing for downstream network optimization in buffers usage
When sending a request, mainly to multiple nodes, if we already have the "body" of the request in bytes, we can share it instead of copying it over to a new buffer. Also, it helps a lot when sending a relatively large body to multiple nodes, since it will use the same body buffer across all nodes

Shay Banon 12 years ago
parent
commit
a9e259d438

+ 9 - 37
src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java

@@ -37,7 +37,6 @@ import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.*;
 
-import java.io.IOException;
 import java.util.Map;
 
 /**
@@ -116,7 +115,7 @@ public class PublishClusterStateAction extends AbstractComponent {
                 // no need to put a timeout on the options here, because we want the response to eventually be received
                 // and not log an error if it arrives after the timeout
                 transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION,
-                        new PublishClusterStateRequest(bytes, node.version()),
+                        new BytesTransportRequest(bytes, node.version()),
                         options, // no need to compress, we already compressed the bytes
 
                         new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@@ -152,52 +151,25 @@ public class PublishClusterStateAction extends AbstractComponent {
         }
     }
 
-    class PublishClusterStateRequest extends TransportRequest {
-
-        BytesReference clusterStateInBytes;
-        Version version;
-
-        PublishClusterStateRequest() {
-        }
-
-        PublishClusterStateRequest(BytesReference clusterStateInBytes, Version version) {
-            this.clusterStateInBytes = clusterStateInBytes;
-            this.version = version;
-        }
-
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
-            clusterStateInBytes = in.readBytesReference();
-            version = in.getVersion();
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeBytesReference(clusterStateInBytes);
-        }
-    }
-
-    private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler<PublishClusterStateRequest> {
+    private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler<BytesTransportRequest> {
 
         static final String ACTION = "discovery/zen/publish";
 
         @Override
-        public PublishClusterStateRequest newInstance() {
-            return new PublishClusterStateRequest();
+        public BytesTransportRequest newInstance() {
+            return new BytesTransportRequest();
         }
 
         @Override
-        public void messageReceived(PublishClusterStateRequest request, final TransportChannel channel) throws Exception {
-            Compressor compressor = CompressorFactory.compressor(request.clusterStateInBytes);
+        public void messageReceived(BytesTransportRequest request, final TransportChannel channel) throws Exception {
+            Compressor compressor = CompressorFactory.compressor(request.bytes());
             StreamInput in;
             if (compressor != null) {
-                in = CachedStreamInput.cachedHandlesCompressed(compressor, request.clusterStateInBytes.streamInput());
+                in = CachedStreamInput.cachedHandlesCompressed(compressor, request.bytes().streamInput());
             } else {
-                in = CachedStreamInput.cachedHandles(request.clusterStateInBytes.streamInput());
+                in = CachedStreamInput.cachedHandles(request.bytes().streamInput());
             }
-            in.setVersion(request.version);
+            in.setVersion(request.version());
             ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
             logger.debug("received cluster state version {}", clusterState.version());
             listener.onNewClusterState(clusterState, new NewClusterStateListener.NewStateProcessed() {

+ 76 - 0
src/main/java/org/elasticsearch/transport/BytesTransportRequest.java

@@ -0,0 +1,76 @@
+/*
+ * Licensed to ElasticSearch and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. ElasticSearch licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.transport;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+
+/**
+ * A specialized, bytes only request, that can potentially be optimized on the network
+ * layer, specifically for teh same large buffer send to several nodes.
+ */
+public class BytesTransportRequest extends TransportRequest {
+
+    BytesReference bytes;
+    Version version;
+
+    public BytesTransportRequest() {
+
+    }
+
+    public BytesTransportRequest(BytesReference bytes, Version version) {
+        this.bytes = bytes;
+        this.version = version;
+    }
+
+    public Version version() {
+        return this.version;
+    }
+
+    public BytesReference bytes() {
+        return this.bytes;
+    }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        super.readFrom(in);
+        bytes = in.readBytesReference();
+        version = in.getVersion();
+    }
+
+    /**
+     * Writes the data in a "thin" manner, without the actual bytes, assumes
+     * the actual bytes will be appended right after this content.
+     */
+    public void writeThin(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeVInt(bytes.length());
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeBytesReference(bytes);
+    }
+}

+ 19 - 3
src/main/java/org/elasticsearch/transport/netty/NettyTransport.java

@@ -53,6 +53,7 @@ import org.elasticsearch.transport.support.TransportStatus;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.CompositeChannelBuffer;
 import org.jboss.netty.channel.*;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
@@ -554,10 +555,25 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
 
         stream.setVersion(version);
         stream.writeString(action);
-        request.writeTo(stream);
-        stream.close();
 
-        ChannelBuffer buffer = bStream.bytes().toChannelBuffer();
+        ChannelBuffer buffer;
+        // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
+        // that create paged channel buffers, but its tricky to know when to do it (where this option is
+        // more explicit).
+        if (request instanceof BytesTransportRequest) {
+            BytesTransportRequest bRequest = (BytesTransportRequest) request;
+            assert node.version().equals(bRequest.version());
+            bRequest.writeThin(stream);
+            stream.close();
+            ChannelBuffer headerBuffer = bStream.bytes().toChannelBuffer();
+            ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
+            // false on gathering, cause gathering causes the NIO layer to combine the buffers into a single direct buffer....
+            buffer = new CompositeChannelBuffer(headerBuffer.order(), ImmutableList.<ChannelBuffer>of(headerBuffer, contentBuffer), false);
+        } else {
+            request.writeTo(stream);
+            stream.close();
+            buffer = bStream.bytes().toChannelBuffer();
+        }
         NettyHeader.writeHeader(buffer, requestId, status, version);
         targetChannel.write(buffer);