Procházet zdrojové kódy

去掉kafka server端TRANSACTIONBEGIN和TRANSACTIONEND的判断

machey před 7 roky
rodič
revize
ee5775ce05

+ 1 - 1
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -45,7 +45,7 @@ public class KafkaCanalConnector {
         properties.put("auto.offset.reset", "latest"); // 如果没有offset则从最后的offset开始读
         properties.put("request.timeout.ms", "40000"); // 必须大于session.timeout.ms的设置
         properties.put("session.timeout.ms", "30000"); // 默认为30秒
-        properties.put("max.poll.records", "1"); // 所以一次只取一条数据
+        properties.put("max.poll.records", "1"); // 一次只取一条message数据
         properties.put("key.deserializer", StringDeserializer.class.getName());
         properties.put("value.deserializer", MessageDeserializer.class.getName());
 

+ 1 - 27
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -48,33 +48,7 @@ public class CanalKafkaProducer {
         }
     }
 
-    public void send(KafkaProperties.Topic topic, Message message) throws IOException {
-        // set canal.instance.filter.transaction.entry = true
-
-        // boolean valid = false;
-        // if (message != null) {
-        // if (message.isRaw() && !message.getRawEntries().isEmpty()) {
-        // for (ByteString byteString : message.getRawEntries()) {
-        // CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
-        // if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
-        // && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
-        // valid = true;
-        // break;
-        // }
-        // }
-        // } else if (!message.getEntries().isEmpty()){
-        // for (CanalEntry.Entry entry : message.getEntries()) {
-        // if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
-        // && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
-        // valid = true;
-        // break;
-        // }
-        // }
-        // }
-        // }
-        // if (!valid) {
-        // return;
-        // }
+    public void send(KafkaProperties.Topic topic, Message message) {
         ProducerRecord<String, Message> record;
         if (topic.getPartition() != null) {
             record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);

+ 3 - 3
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaStarter.java

@@ -54,9 +54,9 @@ public class CanalKafkaStarter implements CanalServerStarter {
             canalKafkaProducer = new CanalKafkaProducer();
             canalKafkaProducer.init(kafkaProperties);
             // set filterTransactionEntry
-            if (kafkaProperties.isFilterTransactionEntry()) {
-                System.setProperty("canal.instance.filter.transaction.entry", "true");
-            }
+            // if (kafkaProperties.isFilterTransactionEntry()) {
+            //     System.setProperty("canal.instance.filter.transaction.entry", "true");
+            // }
             // 对应每个instance启动一个worker线程
             List<CanalDestination> destinations = kafkaProperties.getCanalDestinations();