Browse Source

fixed issue #726 , 单链接复用ByteBuffer数组

七锋 7 years ago
parent
commit
b19eaeb89a

+ 18 - 0
server/src/main/java/com/alibaba/otter/canal/server/netty/NettyUtils.java

@@ -2,8 +2,12 @@ package com.alibaba.otter.canal.server.netty;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
 
+import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.CompositeChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.Channels;
@@ -22,6 +26,20 @@ public class NettyUtils {
     public static int           HEADER_LENGTH    = 4;
     public static Timer         hashedWheelTimer = new HashedWheelTimer();
 
+    public static void write(Channel channel, ByteBuffer body, ChannelFutureListener channelFutureListner) {
+        byte[] header = ByteBuffer.allocate(HEADER_LENGTH).order(ByteOrder.BIG_ENDIAN).putInt(body.limit()).array();
+        List<ChannelBuffer> components = new ArrayList<ChannelBuffer>(2);
+        components.add(ChannelBuffers.wrappedBuffer(ByteOrder.BIG_ENDIAN, header));
+        components.add(ChannelBuffers.wrappedBuffer(body));
+
+        if (channelFutureListner == null) {
+            Channels.write(channel, new CompositeChannelBuffer(ByteOrder.BIG_ENDIAN, components));
+        } else {
+            Channels.write(channel, new CompositeChannelBuffer(ByteOrder.BIG_ENDIAN, components))
+                .addListener(channelFutureListner);
+        }
+    }
+
     public static void write(Channel channel, byte[] body, ChannelFutureListener channelFutureListner) {
         byte[] header = ByteBuffer.allocate(HEADER_LENGTH).order(ByteOrder.BIG_ENDIAN).putInt(body.length).array();
         if (channelFutureListner == null) {

+ 19 - 2
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -73,7 +73,7 @@ public class SessionHandler extends SimpleChannelHandler {
                         }
 
                         embeddedServer.subscribe(clientIdentity);
-                        ctx.setAttachment(clientIdentity);// 设置状态数据
+                        // ctx.setAttachment(clientIdentity);// 设置状态数据
                         NettyUtils.ack(ctx.getChannel(), null);
                     } else {
                         NettyUtils.error(401,
@@ -146,7 +146,19 @@ public class SessionHandler extends SimpleChannelHandler {
                             size += com.google.protobuf.CodedOutputStream.computeTagSize(5)
                                     + com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize)
                                     + messageSize;
-                            // TODO recyle bytes[]
+                            // recyle bytes
+                            // ByteBuffer byteBuffer = (ByteBuffer)
+                            // ctx.getAttachment();
+                            // if (byteBuffer != null && size <=
+                            // byteBuffer.capacity()) {
+                            // byteBuffer.clear();
+                            // } else {
+                            // byteBuffer =
+                            // ByteBuffer.allocate(size).order(ByteOrder.BIG_ENDIAN);
+                            // ctx.setAttachment(byteBuffer);
+                            // }
+                            // CodedOutputStream output =
+                            // CodedOutputStream.newInstance(byteBuffer);
                             byte[] body = new byte[size];
                             CodedOutputStream output = CodedOutputStream.newInstance(body);
                             output.writeEnum(3, PacketType.MESSAGES.getNumber());
@@ -160,6 +172,11 @@ public class SessionHandler extends SimpleChannelHandler {
                             }
                             output.checkNoSpaceLeft();
                             NettyUtils.write(ctx.getChannel(), body, null);
+                            
+                            // output.flush();
+                            // byteBuffer.flip();
+                            // NettyUtils.write(ctx.getChannel(), byteBuffer,
+                            // null);
                         } else {
                             Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
                             packetBuilder.setType(PacketType.MESSAGES);