mcy 6 years ago
parent
commit
ca41be8412

+ 13 - 2
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -144,7 +144,10 @@ public class CanalKafkaProducer implements CanalMQProducer {
             // 发送扁平数据json
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
             if (flatMessages != null) {
+                int idx = 0;
+                int size = flatMessages.size();
                 for (FlatMessage flatMessage : flatMessages) {
+                    idx++;
                     if (StringUtils.isEmpty(canalDestination.getPartitionHash())) {
                         try {
                             Integer partition = canalDestination.getPartition();
@@ -155,7 +158,11 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 partition,
                                 null,
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
-                            producer2.send(record);
+                            if (idx != size) {
+                                producer2.send(record);
+                            } else {
+                                producer2.send(record).get();
+                            }
                         } catch (Exception e) {
                             logger.error(e.getMessage(), e);
                             // producer.abortTransaction();
@@ -175,7 +182,11 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                         i,
                                         null,
                                         JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
-                                    producer2.send(record);
+                                    if (idx != size) {
+                                        producer2.send(record);
+                                    } else {
+                                        producer2.send(record).get();
+                                    }
                                 } catch (Exception e) {
                                     logger.error(e.getMessage(), e);
                                     // producer.abortTransaction();