Procházet zdrojové kódy

fixed kafka performance

七锋 před 6 roky
rodič
revize
26edbeaa28

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -43,6 +43,7 @@ canal.instance.filter.query.dml = false
 canal.instance.filter.query.ddl = false
 canal.instance.filter.table.error = false
 canal.instance.filter.rows = false
+canal.instance.filter.transaction.entry = false
 
 # binlog format/image check
 canal.instance.binlog.format = ROW,STATEMENT,MIXED 

+ 1 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -81,6 +81,7 @@
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
 		<property name="eventStore" ref="eventStore" />
+		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
 	</bean>
 
 	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">

+ 1 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -67,6 +67,7 @@
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
 		<property name="eventStore" ref="eventStore" />
+		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
 	</bean>
 
 	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">

+ 1 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -64,6 +64,7 @@
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
 		<property name="eventStore" ref="eventStore" />
+		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
 	</bean>
 	
 	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.group.GroupEventParser">

+ 1 - 0
deployer/src/main/resources/spring/local-instance.xml

@@ -67,6 +67,7 @@
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
 		<property name="eventStore" ref="eventStore" />
+		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
 	</bean>
 
 	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsLocalBinlogEventParser">

+ 1 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -64,6 +64,7 @@
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
 		<property name="eventStore" ref="eventStore" />
+		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
 	</bean>
 
 	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">

+ 3 - 1
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/MessageDeserializer.java

@@ -29,7 +29,9 @@ public class MessageDeserializer implements Deserializer<Message> {
     @Override
     public Message deserialize(String topic, byte[] data) {
         try {
-            if (data == null) return null;
+            if (data == null) {
+                return null;
+            }
             else {
                 CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
                 switch (p.getType()) {

+ 2 - 2
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/running/ClientRunningData.java

@@ -8,8 +8,8 @@ package com.alibaba.otter.canal.kafka.client.running;
  */
 public class ClientRunningData {
 
-    private String groupId;
-    private String address;
+    private String  groupId;
+    private String  address;
     private boolean active = true;
 
     public String getGroupId() {

+ 44 - 38
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/running/ClientRunningMonitor.java

@@ -1,12 +1,11 @@
 package com.alibaba.otter.canal.kafka.client.running;
 
-import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
-import com.alibaba.otter.canal.common.utils.AddressUtils;
-import com.alibaba.otter.canal.common.utils.BooleanMutex;
-import com.alibaba.otter.canal.common.utils.JsonUtils;
-import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
-import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import java.text.MessageFormat;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkException;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
@@ -17,11 +16,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import java.text.MessageFormat;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
+import com.alibaba.otter.canal.common.utils.AddressUtils;
+import com.alibaba.otter.canal.common.utils.BooleanMutex;
+import com.alibaba.otter.canal.common.utils.JsonUtils;
+import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
+import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+
 
 /**
  * kafka client running状态信息
@@ -31,13 +33,18 @@ import java.util.concurrent.TimeUnit;
  */
 public class ClientRunningMonitor extends AbstractCanalLifeCycle {
 
-    private static final String TOPIC_ROOT_NODE = ZookeeperPathUtils.CANAL_ROOT_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + "topics";
+    private static final String TOPIC_ROOT_NODE             = ZookeeperPathUtils.CANAL_ROOT_NODE
+                                                              + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + "topics";
 
-    private static final String TOPIC_NODE = TOPIC_ROOT_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + "{0}";
+    private static final String TOPIC_NODE                  = TOPIC_ROOT_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR
+                                                              + "{0}";
 
-    private static final String TOPIC_CLIENTID_NODE = TOPIC_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + "{1}";
+    private static final String TOPIC_CLIENTID_NODE         = TOPIC_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR
+                                                              + "{1}";
 
-    private static final String TOPIC_CLIENTID_RUNNING_NODE = TOPIC_CLIENTID_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + ZookeeperPathUtils.RUNNING_NODE;
+    private static final String TOPIC_CLIENTID_RUNNING_NODE = TOPIC_CLIENTID_NODE
+                                                              + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR
+                                                              + ZookeeperPathUtils.RUNNING_NODE;
 
     private static String getTopicClientRunning(String topic, String groupId) {
         return MessageFormat.format(TOPIC_CLIENTID_RUNNING_NODE, topic, groupId);
@@ -47,21 +54,21 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
         return MessageFormat.format(TOPIC_CLIENTID_NODE, topic, groupId);
     }
 
-    private static final Logger logger = LoggerFactory.getLogger(ClientRunningMonitor.class);
-    private ZkClientx zkClient;
-    private String topic;
-    private ClientRunningData clientData;
-    private IZkDataListener dataListener;
-    private BooleanMutex mutex = new BooleanMutex(false);
-    private volatile boolean release = false;
+    private static final Logger        logger       = LoggerFactory.getLogger(ClientRunningMonitor.class);
+    private ZkClientx                  zkClient;
+    private String                     topic;
+    private ClientRunningData          clientData;
+    private IZkDataListener            dataListener;
+    private BooleanMutex               mutex        = new BooleanMutex(false);
+    private volatile boolean           release      = false;
     private volatile ClientRunningData activeData;
-    private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
-    private ClientRunningListener listener;
-    private int delayTime = 5;
+    private ScheduledExecutorService   delayExector = Executors.newScheduledThreadPool(1);
+    private ClientRunningListener      listener;
+    private int                        delayTime    = 5;
 
-    private static Integer virtualPort;
+    private static Integer             virtualPort;
 
-    public ClientRunningMonitor() {
+    public ClientRunningMonitor(){
         if (virtualPort == null) {
             Random rand = new Random();
             virtualPort = rand.nextInt(9000) + 1000;
@@ -108,7 +115,6 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
 
     }
 
-
     public void start() {
         super.start();
 
@@ -123,7 +129,7 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
         String path = getTopicClientRunning(this.topic, clientData.getGroupId());
         zkClient.unsubscribeDataChanges(path, dataListener);
         releaseRunning(); // 尝试一下release
-        //Fix issue #697
+        // Fix issue #697
         if (delayExector != null) {
             delayExector.shutdown();
         }
@@ -159,13 +165,12 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
                 }
             }
         } catch (ZkNoNodeException e) {
-            zkClient.createPersistent(getClientIdNodePath(this.topic, clientData.getGroupId()),
-                    true); // 尝试创建父节点
+            zkClient.createPersistent(getClientIdNodePath(this.topic, clientData.getGroupId()), true); // 尝试创建父节点
             initRunning();
         } catch (Throwable t) {
             logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",
-                    topic),
-                    t);
+                topic),
+                t);
             // 出现任何异常尝试release
             releaseRunning();
             throw new CanalClientException("something goes wrong in initRunning method. ", t);
@@ -187,7 +192,8 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
      */
     public boolean check() {
         String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-        //ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
+        // ZookeeperPathUtils.getDestinationClientRunning(this.destination,
+        // clientData.getClientId());
         try {
             byte[] bytes = zkClient.readData(path);
             ClientRunningData eventData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
@@ -196,8 +202,8 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
             boolean result = isMine(activeData.getAddress());
             if (!result) {
                 logger.warn("canal is running in [{}] , but not in [{}]",
-                        activeData.getAddress(),
-                        clientData.getAddress());
+                    activeData.getAddress(),
+                    clientData.getAddress());
             }
             return result;
         } catch (ZkNoNodeException e) {
@@ -235,7 +241,7 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
         if (listener != null) {
             // 触发回调
             listener.processActiveEnter();
-            this.clientData.setAddress(/*address*/AddressUtils.getHostIp() + ":" + virtualPort);
+            this.clientData.setAddress(/* address */AddressUtils.getHostIp() + ":" + virtualPort);
 
             String path = getTopicClientRunning(this.topic, clientData.getGroupId());
             // 序列化

+ 3 - 3
kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalServerStarter.java

@@ -17,9 +17,9 @@ import com.alibaba.otter.canal.deployer.CanalController;
  */
 public class CanalServerStarter {
 
-    private static final String CLASSPATH_URL_PREFIX = "classpath:";
-    private static final Logger logger               = LoggerFactory.getLogger(CanalServerStarter.class);
-    private volatile static boolean running          = false;
+    private static final String     CLASSPATH_URL_PREFIX = "classpath:";
+    private static final Logger     logger               = LoggerFactory.getLogger(CanalServerStarter.class);
+    private volatile static boolean running              = false;
 
     public static void init() {
         try {

+ 25 - 26
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java

@@ -3,7 +3,6 @@ package com.alibaba.otter.canal.kafka.producer;
 import java.io.IOException;
 import java.util.Properties;
 
-import com.google.protobuf.ByteString;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -12,7 +11,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
-import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
@@ -52,31 +50,32 @@ public class CanalKafkaProducer {
     }
 
     public void send(Topic topic, Message message) throws IOException {
-        boolean valid = false;
-        if (message != null) {
-            if (message.isRaw() && !message.getRawEntries().isEmpty()) {
-                for (ByteString byteString : message.getRawEntries()) {
-                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
-                    if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
-                            && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
-                        valid = true;
-                        break;
-                    }
-                }
-            } else if (!message.getEntries().isEmpty()){
-                for (CanalEntry.Entry entry : message.getEntries()) {
-                    if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
-                            && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
-                        valid = true;
-                        break;
-                    }
-                }
-            }
-        }
-        if (!valid) {
-            return;
-        }
+        // set canal.instance.filter.transaction.entry = true
 
+        // boolean valid = false;
+        // if (message != null) {
+        // if (message.isRaw() && !message.getRawEntries().isEmpty()) {
+        // for (ByteString byteString : message.getRawEntries()) {
+        // CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
+        // if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
+        // && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
+        // valid = true;
+        // break;
+        // }
+        // }
+        // } else if (!message.getEntries().isEmpty()){
+        // for (CanalEntry.Entry entry : message.getEntries()) {
+        // if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
+        // && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
+        // valid = true;
+        // break;
+        // }
+        // }
+        // }
+        // }
+        // if (!valid) {
+        // return;
+        // }
         ProducerRecord<String, Message> record;
         if (topic.getPartition() != null) {
             record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);

+ 4 - 1
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaStarter.java

@@ -52,7 +52,10 @@ public class CanalKafkaStarter {
             // 初始化 kafka producer
             canalKafkaProducer = new CanalKafkaProducer();
             canalKafkaProducer.init(kafkaProperties);
-
+            // set filterTransactionEntry
+            if (kafkaProperties.isFilterTransactionEntry()) {
+                System.setProperty("canal.instance.filter.transaction.entry", "true");
+            }
             // 对应每个instance启动一个worker线程
             List<CanalDestination> destinations = kafkaProperties.getCanalDestinations();
 

+ 17 - 8
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/KafkaProperties.java

@@ -13,15 +13,15 @@ import java.util.Set;
  */
 public class KafkaProperties {
 
-    private String                 servers           = "localhost:6667";
-    private int                    retries           = 0;
-    private int                    batchSize         = 16384;
-    private int                    lingerMs          = 1;
-    private long                   bufferMemory      = 33554432L;
+    private String                 servers                = "localhost:6667";
+    private int                    retries                = 0;
+    private int                    batchSize              = 16384;
+    private int                    lingerMs               = 1;
+    private long                   bufferMemory           = 33554432L;
+    private boolean                filterTransactionEntry = true;
+    private int                    canalBatchSize         = 5;
 
-    private int                    canalBatchSize    = 5;
-
-    private List<CanalDestination> canalDestinations = new ArrayList<CanalDestination>();
+    private List<CanalDestination> canalDestinations      = new ArrayList<CanalDestination>();
 
     public static class CanalDestination {
 
@@ -158,4 +158,13 @@ public class KafkaProperties {
     public void setCanalDestinations(List<CanalDestination> canalDestinations) {
         this.canalDestinations = canalDestinations;
     }
+
+    public boolean isFilterTransactionEntry() {
+        return filterTransactionEntry;
+    }
+
+    public void setFilterTransactionEntry(boolean filterTransactionEntry) {
+        this.filterTransactionEntry = filterTransactionEntry;
+    }
+
 }

+ 49 - 10
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java

@@ -1,13 +1,19 @@
 package com.alibaba.otter.canal.kafka.producer;
 
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.CanalPacket;
-import com.alibaba.otter.canal.protocol.Message;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.Serializer;
 import org.springframework.util.CollectionUtils;
 
-import java.util.Map;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.CanalPacket.PacketType;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
 
 /**
  * Kafka Message类的序列化
@@ -25,20 +31,53 @@ public class MessageSerializer implements Serializer<Message> {
     public byte[] serialize(String topic, Message data) {
         try {
             if (data != null) {
-                CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
                 if (data.getId() != -1) {
                     if (data.isRaw() && !CollectionUtils.isEmpty(data.getRawEntries())) {
-                        messageBuilder.addAllMessages(data.getRawEntries());
+                        // for performance
+                        List<ByteString> rowEntries = data.getRawEntries();
+                        // message size
+                        int messageSize = 0;
+                        messageSize += com.google.protobuf.CodedOutputStream.computeInt64Size(1, data.getId());
+
+                        int dataSize = 0;
+                        for (int i = 0; i < rowEntries.size(); i++) {
+                            dataSize += com.google.protobuf.CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
+                        }
+                        messageSize += dataSize;
+                        messageSize += 1 * rowEntries.size();
+                        // packet size
+                        int size = 0;
+                        size += com.google.protobuf.CodedOutputStream.computeEnumSize(3,
+                            PacketType.MESSAGES.getNumber());
+                        size += com.google.protobuf.CodedOutputStream.computeTagSize(5)
+                                + com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize)
+                                + messageSize;
+                        // build data
+                        byte[] body = new byte[size];
+                        CodedOutputStream output = CodedOutputStream.newInstance(body);
+                        output.writeEnum(3, PacketType.MESSAGES.getNumber());
+
+                        output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+                        output.writeRawVarint32(messageSize);
+                        // message
+                        output.writeInt64(1, data.getId());
+                        for (int i = 0; i < rowEntries.size(); i++) {
+                            output.writeBytes(2, rowEntries.get(i));
+                        }
+                        output.checkNoSpaceLeft();
+                        return body;
                     } else if (!CollectionUtils.isEmpty(data.getEntries())) {
+                        CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
                         for (CanalEntry.Entry entry : data.getEntries()) {
                             messageBuilder.addMessages(entry.toByteString());
                         }
+
+                        CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
+                        packetBuilder.setType(CanalPacket.PacketType.MESSAGES);
+                        packetBuilder.setBody(messageBuilder.build().toByteString());
+                        return packetBuilder.build().toByteArray();
                     }
                 }
-                CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
-                packetBuilder.setType(CanalPacket.PacketType.MESSAGES);
-                packetBuilder.setBody(messageBuilder.build().toByteString());
-                return packetBuilder.build().toByteArray();
             }
         } catch (Exception e) {
             throw new SerializationException("Error when serializing message to byte[] ");

+ 1 - 0
kafka/src/main/resources/kafka.yml

@@ -5,6 +5,7 @@ lingerMs: 1
 bufferMemory: 33554432
 # canal的批次大小,单位 k
 canalBatchSize: 50
+filterTransactionEntry: true
 
 canalDestinations:
   - canalDestination: example