瀏覽代碼

整理代码

mcy 6 年之前
父節點
當前提交
e566160f1e

+ 6 - 4
deployer/src/main/resources/kafka.yml

@@ -12,9 +12,11 @@ flatMessage: true
 
 canalDestinations:
   - canalDestination: example
-    topic: expample
-    #partitionsNum: 3
-    #partitionHash:
-    #  mytest.person: id
+    topic: example
+#    #对应topic分区数量
+#    partitionsNum: 3
+#    partitionHash:
+#      #库名.表名: 唯一主键
+#      mytest.person: id
 
 

+ 8 - 9
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -99,16 +99,15 @@ public class CanalKafkaProducer {
                                     canalDestination.getPartitionHash());
                                 int length = partitionFlatMessage.length;
                                 for (int i = 0; i < length; i++) {
-                                    FlatMessage flatMessage1 = partitionFlatMessage[i];
-                                    if (flatMessage1 == null) {
-                                        continue;
+                                    FlatMessage flatMessagePart = partitionFlatMessage[i];
+                                    if (flatMessagePart != null) {
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                                canalDestination.getTopic(),
+                                                i,
+                                                null,
+                                                JSON.toJSONString(flatMessagePart));
+                                        producer2.send(record);
                                     }
-                                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                        canalDestination.getTopic(),
-                                        i,
-                                        null,
-                                        JSON.toJSONString(partitionFlatMessage[i]));
-                                    producer2.send(record);
                                 }
                             } else {
                                 ProducerRecord<String, String> record = new ProducerRecord<String, String>(