瀏覽代碼

增加对message的验证,过滤transaction begin end 类型

rewerma 7 年之前
父節點
當前提交
6faa3fd586
共有 2 個文件被更改,包括 16 次插入7 次删除
  1. 5 5
      kafka/pom.xml
  2. 11 2
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java

+ 5 - 5
kafka/pom.xml

@@ -27,11 +27,11 @@
         </dependency>
 
         <!-- Kafka -->
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>0.10.0.1</version>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.apache.kafka</groupId>-->
+            <!--<artifactId>kafka-clients</artifactId>-->
+            <!--<version>0.10.0.1</version>-->
+        <!--</dependency>-->
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.11</artifactId>

+ 11 - 2
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java

@@ -10,7 +10,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
 import java.util.Properties;
 
 /**
@@ -49,10 +48,20 @@ public class CanalKafkaProducer {
     }
 
     public void send(Topic topic, Message message) {
-        if (message == null || message.getEntries().isEmpty()) {
+        boolean valid = false;
+        if (message != null && !message.getEntries().isEmpty()) {
+            for (CanalEntry.Entry entry : message.getEntries()) {
+                if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
+                    valid = true;
+                    break;
+                }
+            }
+        }
+        if (!valid) {
             return;
         }
 
+
         ProducerRecord<String, Message> record;
         if (topic.getPartition() != null) {
             record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);