Bladeren bron

fixed issue #726, 优化SessionHandler里直接拼byte[],绕过protobuf的多次拷贝

七锋 6 jaren geleden
bovenliggende
commit
5582a38930

+ 2 - 1
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -319,7 +319,8 @@ public class SimpleCanalConnector implements CanalConnector {
     }
 
     private Message receiveMessages() throws IOException {
-        Packet p = Packet.parseFrom(readNextPacket());
+        byte[] data = readNextPacket();
+        Packet p = Packet.parseFrom(data);
         switch (p.getType()) {
             case MESSAGES: {
                 if (!p.getCompression().equals(Compression.NONE)) {

+ 5 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -331,6 +331,11 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         stopHeartBeat(); // 先停止心跳
         parseThread.interrupt(); // 尝试中断
         eventSink.interrupt();
+
+        if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
+            multiStageCoprocessor.stop();
+        }
+
         try {
             parseThread.join();// 等待其结束
         } catch (InterruptedException e) {
@@ -343,10 +348,6 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         if (transactionBuffer.isStart()) {
             transactionBuffer.stop();
         }
-
-        if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
-            multiStageCoprocessor.stop();
-        }
     }
 
     protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/MultiStageCoprocessor.java

@@ -21,9 +21,9 @@ public interface MultiStageCoprocessor extends CanalLifeCycle {
     /**
      * 网络数据投递
      */
-    public void publish(LogBuffer buffer);
+    public boolean publish(LogBuffer buffer);
 
-    public void publish(LogBuffer buffer, String binlogFileName);
+    public boolean publish(LogBuffer buffer, String binlogFileName);
 
     public void reset();
 }

+ 5 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -221,7 +221,11 @@ public class LocalBinLogConnection implements ErosaConnection {
                 while (fetcher.fetch()) {
                     LogBuffer buffer = fetcher.duplicate();
                     fetcher.consume(fetcher.limit());
-                    coprocessor.publish(buffer, binlogfilename); // set filename
+                    // set filename
+                    if (!coprocessor.publish(buffer, binlogfilename)) {
+                        needContinue = false;
+                        break;
+                    }
                 }
 
                 if (needContinue) {// 读取下一个

+ 6 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -206,7 +206,9 @@ public class MysqlConnection implements ErosaConnection {
             while (fetcher.fetch()) {
                 LogBuffer buffer = fetcher.duplicate();
                 fetcher.consume(fetcher.limit());
-                coprocessor.publish(buffer);
+                if (!coprocessor.publish(buffer)) {
+                    break;
+                }
             }
         } finally {
             fetcher.close();
@@ -230,7 +232,9 @@ public class MysqlConnection implements ErosaConnection {
             while (fetcher.fetch()) {
                 LogBuffer buffer = fetcher.duplicate();
                 fetcher.consume(fetcher.limit());
-                coprocessor.publish(buffer);
+                if (!coprocessor.publish(buffer)) {
+                    break;
+                }
             }
         } finally {
             fetcher.close();

+ 10 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -135,20 +135,22 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         } catch (Throwable e) {
             // ignore
         }
-        disruptorMsgBuffer = null;
         super.stop();
     }
 
     /**
      * 网络数据投递
      */
-    public void publish(LogBuffer buffer) {
-        publish(buffer, null);
+    public boolean publish(LogBuffer buffer) {
+        return publish(buffer, null);
     }
 
-    public void publish(LogBuffer buffer, String binlogFileName) {
-        if (!isStart() && exception != null) {
-            throw exception;
+    public boolean publish(LogBuffer buffer, String binlogFileName) {
+        if (!isStart()) {
+            if (exception != null) {
+                throw exception;
+            }
+            return false;
         }
 
         boolean interupted = false;
@@ -172,6 +174,8 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         if (exception != null) {
             throw exception;
         }
+
+        return isStart();
     }
 
     @Override

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/NettyUtils.java

@@ -19,7 +19,7 @@ import com.alibaba.otter.canal.protocol.CanalPacket.Packet;
 public class NettyUtils {
 
     private static final Logger logger           = LoggerFactory.getLogger(NettyUtils.class);
-    private static int          HEADER_LENGTH    = 4;
+    public static int           HEADER_LENGTH    = 4;
     public static Timer         hashedWheelTimer = new HashedWheelTimer();
 
     public static void write(Channel channel, byte[] body, ChannelFutureListener channelFutureListner) {

+ 46 - 19
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -33,13 +33,10 @@ import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 import com.alibaba.otter.canal.server.netty.NettyUtils;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
 
-/**
- * 处理具体的客户端请求
- * 
- * @author jianghang 2012-10-24 下午02:21:13
- * @version 1.0.0
- */
 public class SessionHandler extends SimpleChannelHandler {
 
     private static final Logger     logger = LoggerFactory.getLogger(SessionHandler.class);
@@ -130,27 +127,57 @@ public class SessionHandler extends SimpleChannelHandler {
                         }
                         // }
 
-                        Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
-                        packetBuilder.setType(PacketType.MESSAGES);
+                        if (message.getId() != -1 && message.isRaw()) {
+                            List<ByteString> rowEntries = message.getRawEntries();
+                            // message size
+                            int messageSize = 0;
+                            messageSize += com.google.protobuf.CodedOutputStream.computeInt64Size(1, message.getId());
 
-                        Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
-                        messageBuilder.setBatchId(message.getId());
-                        if (message.getId() != -1) {
-                            if (message.isRaw()) {
-                                // for performance
-                                if (!CollectionUtils.isEmpty(message.getRawEntries())) {
+                            int dataSize = 0;
+                            for (int i = 0; i < rowEntries.size(); i++) {
+                                dataSize += com.google.protobuf.CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
+                            }
+                            messageSize += dataSize;
+                            messageSize += 1 * rowEntries.size();
+                            // packet size
+                            int size = 0;
+                            size += com.google.protobuf.CodedOutputStream.computeEnumSize(3,
+                                PacketType.MESSAGES.getNumber());
+                            size += com.google.protobuf.CodedOutputStream.computeTagSize(5)
+                                    + com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize)
+                                    + messageSize;
+                            // TODO recyle bytes[]
+                            byte[] body = new byte[size];
+                            CodedOutputStream output = CodedOutputStream.newInstance(body);
+                            output.writeEnum(3, PacketType.MESSAGES.getNumber());
+
+                            output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+                            output.writeRawVarint32(messageSize);
+                            // message
+                            output.writeInt64(1, message.getId());
+                            for (int i = 0; i < rowEntries.size(); i++) {
+                                output.writeBytes(2, rowEntries.get(i));
+                            }
+                            output.checkNoSpaceLeft();
+                            NettyUtils.write(ctx.getChannel(), body, null);
+                        } else {
+                            Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
+                            packetBuilder.setType(PacketType.MESSAGES);
+
+                            Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
+                            messageBuilder.setBatchId(message.getId());
+                            if (message.getId() != -1) {
+                                if (message.isRaw() && !CollectionUtils.isEmpty(message.getRawEntries())) {
                                     messageBuilder.addAllMessages(message.getRawEntries());
-                                }
-                            } else {
-                                if (!CollectionUtils.isEmpty(message.getEntries())) {
+                                } else if (!CollectionUtils.isEmpty(message.getEntries())) {
                                     for (Entry entry : message.getEntries()) {
                                         messageBuilder.addMessages(entry.toByteString());
                                     }
                                 }
                             }
+                            packetBuilder.setBody(messageBuilder.build().toByteString());
+                            NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);// 输出数据
                         }
-                        packetBuilder.setBody(messageBuilder.build().toByteString());
-                        NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);// 输出数据
                     } else {
                         NettyUtils.error(401,
                             MessageFormatter.format("destination or clientId is null", get.toString()).getMessage(),

+ 92 - 0
server/src/test/java/com/alibaba/otter/canal/server/ProtocolTest.java

@@ -0,0 +1,92 @@
+package com.alibaba.otter.canal.server;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
+import com.alibaba.otter.canal.protocol.CanalEntry.Header;
+import com.alibaba.otter.canal.protocol.CanalPacket.Compression;
+import com.alibaba.otter.canal.protocol.CanalPacket.Messages;
+import com.alibaba.otter.canal.protocol.CanalPacket.Packet;
+import com.alibaba.otter.canal.protocol.CanalPacket.PacketType;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
+
+public class ProtocolTest {
+
+    @Test
+    public void testSimple() throws IOException {
+        Header.Builder headerBuilder = Header.newBuilder();
+        headerBuilder.setLogfileName("mysql-bin.000001");
+        headerBuilder.setLogfileOffset(1024);
+        headerBuilder.setExecuteTime(1024);
+        Entry.Builder entryBuilder = Entry.newBuilder();
+        entryBuilder.setHeader(headerBuilder.build());
+        entryBuilder.setEntryType(EntryType.ROWDATA);
+        Entry entry = entryBuilder.build();
+        Message message = new Message(3, true, Arrays.asList(entry.toByteString()));
+
+        byte[] body = buildData(message);
+        Packet packet = Packet.parseFrom(body);
+        switch (packet.getType()) {
+            case MESSAGES: {
+                if (!packet.getCompression().equals(Compression.NONE)) {
+                    throw new CanalClientException("compression is not supported in this connector");
+                }
+
+                Messages messages = Messages.parseFrom(packet.getBody());
+                Message result = new Message(messages.getBatchId());
+                for (ByteString byteString : messages.getMessagesList()) {
+                    result.addEntry(Entry.parseFrom(byteString));
+                }
+
+                System.out.println(result);
+                break;
+            }
+            default: {
+                throw new CanalClientException("unexpected packet type: " + packet.getType());
+            }
+        }
+    }
+
+    private byte[] buildData(Message message) throws IOException {
+        List<ByteString> rowEntries = message.getRawEntries();
+        // message size
+        int messageSize = 0;
+        messageSize += com.google.protobuf.CodedOutputStream.computeInt64Size(1, message.getId());
+
+        int dataSize = 0;
+        for (int i = 0; i < rowEntries.size(); i++) {
+            dataSize += com.google.protobuf.CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
+        }
+        messageSize += dataSize;
+        messageSize += 1 * rowEntries.size();
+        // packet size
+        int size = 0;
+        size += com.google.protobuf.CodedOutputStream.computeEnumSize(3, PacketType.MESSAGES.getNumber());
+        size += com.google.protobuf.CodedOutputStream.computeTagSize(5)
+                + com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize;
+        // TODO recyle bytes[]
+        byte[] body = new byte[size];
+        CodedOutputStream output = CodedOutputStream.newInstance(body);
+        output.writeEnum(3, PacketType.MESSAGES.getNumber());
+
+        output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+        output.writeRawVarint32(messageSize);
+        // message
+        output.writeInt64(1, message.getId());
+        for (int i = 0; i < rowEntries.size(); i++) {
+            output.writeBytes(2, rowEntries.get(i));
+        }
+        output.checkNoSpaceLeft();
+
+        return body;
+    }
+}