Jelajahi Sumber

Merge pull request #790 from wingerx/master

kafka producer 适配 row data for performance #726
agapple 6 tahun lalu
induk
melakukan
50d1ec583b

+ 20 - 7
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java

@@ -1,7 +1,9 @@
 package com.alibaba.otter.canal.kafka.producer;
 
+import java.io.IOException;
 import java.util.Properties;
 
+import com.google.protobuf.ByteString;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -49,14 +51,25 @@ public class CanalKafkaProducer {
         }
     }
 
-    public void send(Topic topic, Message message) {
+    public void send(Topic topic, Message message) throws IOException {
         boolean valid = false;
-        if (message != null && !message.getEntries().isEmpty()) {
-            for (CanalEntry.Entry entry : message.getEntries()) {
-                if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
-                    && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
-                    valid = true;
-                    break;
+        if (message != null) {
+            if (message.isRaw() && !message.getRawEntries().isEmpty()) {
+                for (ByteString byteString : message.getRawEntries()) {
+                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
+                    if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
+                            && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
+                        valid = true;
+                        break;
+                    }
+                }
+            } else if (!message.getEntries().isEmpty()){
+                for (CanalEntry.Entry entry : message.getEntries()) {
+                    if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
+                            && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
+                        valid = true;
+                        break;
+                    }
                 }
             }
         }

+ 13 - 10
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java

@@ -1,14 +1,13 @@
 package com.alibaba.otter.canal.kafka.producer;
 
-import java.util.Map;
-
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.Message;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.Serializer;
 import org.springframework.util.CollectionUtils;
 
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.CanalPacket;
-import com.alibaba.otter.canal.protocol.Message;
+import java.util.Map;
 
 /**
  * Kafka Message类的序列化
@@ -25,12 +24,15 @@ public class MessageSerializer implements Serializer<Message> {
     @Override
     public byte[] serialize(String topic, Message data) {
         try {
-            if (data == null) return null;
-            else {
+            if (data != null) {
                 CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
-                if (data.getId() != -1 && !CollectionUtils.isEmpty(data.getEntries())) {
-                    for (CanalEntry.Entry entry : data.getEntries()) {
-                        messageBuilder.addMessages(entry.toByteString());
+                if (data.getId() != -1) {
+                    if (data.isRaw() && !CollectionUtils.isEmpty(data.getRawEntries())) {
+                        messageBuilder.addAllMessages(data.getRawEntries());
+                    } else if (!CollectionUtils.isEmpty(data.getEntries())) {
+                        for (CanalEntry.Entry entry : data.getEntries()) {
+                            messageBuilder.addMessages(entry.toByteString());
+                        }
                     }
                 }
                 CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
@@ -41,6 +43,7 @@ public class MessageSerializer implements Serializer<Message> {
         } catch (Exception e) {
             throw new SerializationException("Error when serializing message to byte[] ");
         }
+        return null;
     }
 
     @Override