Browse Source

fixed issue #1075, support message transactor filter

七锋 6 years ago
parent
commit
dd8243a56d

+ 7 - 1
server/src/main/java/com/alibaba/otter/canal/common/CanalMessageSerializer.java

@@ -16,7 +16,7 @@ import com.google.protobuf.WireFormat;
 public class CanalMessageSerializer {
 
     @SuppressWarnings("deprecation")
-    public static byte[] serializer(Message data) {
+    public static byte[] serializer(Message data, boolean filterTransactionEntry) {
         try {
             if (data != null) {
                 if (data.getId() != -1) {
@@ -53,8 +53,14 @@ public class CanalMessageSerializer {
                         output.checkNoSpaceLeft();
                         return body;
                     } else if (!CollectionUtils.isEmpty(data.getEntries())) {
+                        // mq模式只会走到非rowEntry模式
                         CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
                         for (CanalEntry.Entry entry : data.getEntries()) {
+                            if (filterTransactionEntry
+                                && (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND)) {
+                                continue;
+                            }
+
                             messageBuilder.addMessages(entry.toByteString());
                         }
 

+ 9 - 1
server/src/main/java/com/alibaba/otter/canal/kafka/MessageSerializer.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.kafka;
 
 import java.util.Map;
 
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.kafka.common.serialization.Serializer;
 
 import com.alibaba.otter.canal.common.CanalMessageSerializer;
@@ -15,13 +16,20 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class MessageSerializer implements Serializer<Message> {
 
+    private boolean filterTransactionEntry = false;
+
+    public MessageSerializer(){
+        this.filterTransactionEntry = BooleanUtils.toBoolean(System.getProperty("canal.instance.filter.transaction.entry",
+            "false"));
+    }
+
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
     }
 
     @Override
     public byte[] serialize(String topic, Message data) {
-        return CanalMessageSerializer.serializer(data);
+        return CanalMessageSerializer.serializer(data, filterTransactionEntry);
     }
 
     @Override

+ 2 - 1
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -47,7 +47,8 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                      Callback callback) {
         if (!mqProperties.getFlatMessage()) {
             try {
-                Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data));
+                Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data,
+                    mqProperties.isFilterTransactionEntry()));
                 logger.debug("send message:{} to destination:{}, partition: {}",
                     message,
                     destination.getCanalDestination(),