Parcourir la source

fixed issue #1826 , remove kafka transaction send

agapple il y a 5 ans
Parent
commit
a847660a45

+ 43 - 44
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -10,56 +10,55 @@ import java.text.MessageFormat;
  */
 public class CanalConstants {
 
-    public static final String MDC_DESTINATION                       = "destination";
-    public static final String ROOT                                  = "canal";
-    public static final String CANAL_ID                              = ROOT + "." + "id";
-    public static final String CANAL_IP                              = ROOT + "." + "ip";
-    public static final String CANAL_PORT                            = ROOT + "." + "port";
-    public static final String CANAL_METRICS_PULL_PORT               = ROOT + "." + "metrics.pull.port";
-    public static final String CANAL_ZKSERVERS                       = ROOT + "." + "zkServers";
-    public static final String CANAL_WITHOUT_NETTY                   = ROOT + "." + "withoutNetty";
+    public static final String MDC_DESTINATION                      = "destination";
+    public static final String ROOT                                 = "canal";
+    public static final String CANAL_ID                             = ROOT + "." + "id";
+    public static final String CANAL_IP                             = ROOT + "." + "ip";
+    public static final String CANAL_PORT                           = ROOT + "." + "port";
+    public static final String CANAL_METRICS_PULL_PORT              = ROOT + "." + "metrics.pull.port";
+    public static final String CANAL_ZKSERVERS                      = ROOT + "." + "zkServers";
+    public static final String CANAL_WITHOUT_NETTY                  = ROOT + "." + "withoutNetty";
 
-    public static final String CANAL_DESTINATIONS                    = ROOT + "." + "destinations";
-    public static final String CANAL_AUTO_SCAN                       = ROOT + "." + "auto.scan";
-    public static final String CANAL_AUTO_SCAN_INTERVAL              = ROOT + "." + "auto.scan.interval";
-    public static final String CANAL_CONF_DIR                        = ROOT + "." + "conf.dir";
-    public static final String CANAL_SERVER_MODE                     = ROOT + "." + "serverMode";
+    public static final String CANAL_DESTINATIONS                   = ROOT + "." + "destinations";
+    public static final String CANAL_AUTO_SCAN                      = ROOT + "." + "auto.scan";
+    public static final String CANAL_AUTO_SCAN_INTERVAL             = ROOT + "." + "auto.scan.interval";
+    public static final String CANAL_CONF_DIR                       = ROOT + "." + "conf.dir";
+    public static final String CANAL_SERVER_MODE                    = ROOT + "." + "serverMode";
 
-    public static final String CANAL_DESTINATION_SPLIT               = ",";
-    public static final String GLOBAL_NAME                           = "global";
+    public static final String CANAL_DESTINATION_SPLIT              = ",";
+    public static final String GLOBAL_NAME                          = "global";
 
-    public static final String INSTANCE_MODE_TEMPLATE                = ROOT + "." + "instance.{0}.mode";
-    public static final String INSTANCE_LAZY_TEMPLATE                = ROOT + "." + "instance.{0}.lazy";
-    public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE     = ROOT + "." + "instance.{0}.manager.address";
-    public static final String INSTANCE_SPRING_XML_TEMPLATE          = ROOT + "." + "instance.{0}.spring.xml";
+    public static final String INSTANCE_MODE_TEMPLATE               = ROOT + "." + "instance.{0}.mode";
+    public static final String INSTANCE_LAZY_TEMPLATE               = ROOT + "." + "instance.{0}.lazy";
+    public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE    = ROOT + "." + "instance.{0}.manager.address";
+    public static final String INSTANCE_SPRING_XML_TEMPLATE         = ROOT + "." + "instance.{0}.spring.xml";
 
-    public static final String CANAL_DESTINATION_PROPERTY            = ROOT + ".instance.destination";
+    public static final String CANAL_DESTINATION_PROPERTY           = ROOT + ".instance.destination";
 
-    public static final String CANAL_SOCKETCHANNEL                   = ROOT + "." + "socketChannel";
+    public static final String CANAL_SOCKETCHANNEL                  = ROOT + "." + "socketChannel";
 
-    public static final String CANAL_MQ_SERVERS                      = ROOT + "." + "mq.servers";
-    public static final String CANAL_MQ_RETRIES                      = ROOT + "." + "mq.retries";
-    public static final String CANAL_MQ_BATCHSIZE                    = ROOT + "." + "mq.batchSize";
-    public static final String CANAL_MQ_LINGERMS                     = ROOT + "." + "mq.lingerMs";
-    public static final String CANAL_MQ_MAXREQUESTSIZE               = ROOT + "." + "mq.maxRequestSize";
-    public static final String CANAL_MQ_BUFFERMEMORY                 = ROOT + "." + "mq.bufferMemory";
-    public static final String CANAL_MQ_CANALBATCHSIZE               = ROOT + "." + "mq.canalBatchSize";
-    public static final String CANAL_MQ_CANALGETTIMEOUT              = ROOT + "." + "mq.canalGetTimeout";
-    public static final String CANAL_MQ_FLATMESSAGE                  = ROOT + "." + "mq.flatMessage";
-    public static final String CANAL_MQ_COMPRESSION_TYPE             = ROOT + "." + "mq.compressionType";
-    public static final String CANAL_MQ_ACKS                         = ROOT + "." + "mq.acks";
-    public static final String CANAL_MQ_TRANSACTION                  = ROOT + "." + "mq.transaction";
-    public static final String CANAL_MQ_PRODUCERGROUP                = ROOT + "." + "mq.producerGroup";
-    public static final String CANAL_ALIYUN_ACCESSKEY                = ROOT + "." + "aliyun.accessKey";
-    public static final String CANAL_ALIYUN_SECRETKEY                = ROOT + "." + "aliyun.secretKey";
-    public static final String CANAL_MQ_PROPERTIES                   = ROOT + "." + "mq.properties";
-    public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE         = ROOT + "." + "mq.enableMessageTrace";
-    public static final String CANAL_MQ_ACCESS_CHANNEL               = ROOT + "." + "mq.accessChannel";
-    public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC       = ROOT + "." + "mq.customizedTraceTopic";
-    public static final String CANAL_MQ_NAMESPACE                    = ROOT + "." + "mq.namespace";
-    public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE        = ROOT + "." + "mq.kafka.kerberos.enable";
-    public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH  = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
-    public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH  = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
+    public static final String CANAL_MQ_SERVERS                     = ROOT + "." + "mq.servers";
+    public static final String CANAL_MQ_RETRIES                     = ROOT + "." + "mq.retries";
+    public static final String CANAL_MQ_BATCHSIZE                   = ROOT + "." + "mq.batchSize";
+    public static final String CANAL_MQ_LINGERMS                    = ROOT + "." + "mq.lingerMs";
+    public static final String CANAL_MQ_MAXREQUESTSIZE              = ROOT + "." + "mq.maxRequestSize";
+    public static final String CANAL_MQ_BUFFERMEMORY                = ROOT + "." + "mq.bufferMemory";
+    public static final String CANAL_MQ_CANALBATCHSIZE              = ROOT + "." + "mq.canalBatchSize";
+    public static final String CANAL_MQ_CANALGETTIMEOUT             = ROOT + "." + "mq.canalGetTimeout";
+    public static final String CANAL_MQ_FLATMESSAGE                 = ROOT + "." + "mq.flatMessage";
+    public static final String CANAL_MQ_COMPRESSION_TYPE            = ROOT + "." + "mq.compressionType";
+    public static final String CANAL_MQ_ACKS                        = ROOT + "." + "mq.acks";
+    public static final String CANAL_MQ_PRODUCERGROUP               = ROOT + "." + "mq.producerGroup";
+    public static final String CANAL_ALIYUN_ACCESSKEY               = ROOT + "." + "aliyun.accessKey";
+    public static final String CANAL_ALIYUN_SECRETKEY               = ROOT + "." + "aliyun.secretKey";
+    public static final String CANAL_MQ_PROPERTIES                  = ROOT + "." + "mq.properties";
+    public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE        = ROOT + "." + "mq.enableMessageTrace";
+    public static final String CANAL_MQ_ACCESS_CHANNEL              = ROOT + "." + "mq.accessChannel";
+    public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC      = ROOT + "." + "mq.customizedTraceTopic";
+    public static final String CANAL_MQ_NAMESPACE                   = ROOT + "." + "mq.namespace";
+    public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE       = ROOT + "." + "mq.kafka.kerberos.enable";
+    public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
+    public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
 
     public static String getInstanceModeKey(String destination) {
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);

+ 22 - 20
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -1,5 +1,15 @@
 package com.alibaba.otter.canal.deployer;
 
+import java.io.File;
+import java.io.FileFilter;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
 import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
@@ -8,15 +18,6 @@ import com.alibaba.otter.canal.spi.CanalMQProducer;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
 
 /**
  * Canal server 启动类
@@ -64,8 +65,8 @@ public class CanalStater {
 
                         public boolean accept(File pathname) {
                             String filename = pathname.getName();
-                            return pathname.isDirectory() && !"spring".equalsIgnoreCase(filename) &&
-                                    !"metrics".equalsIgnoreCase(filename);
+                            return pathname.isDirectory() && !"spring".equalsIgnoreCase(filename)
+                                   && !"metrics".equalsIgnoreCase(filename);
                         }
                     });
                     if (instanceDirs != null && instanceDirs.length > 0) {
@@ -196,17 +197,14 @@ public class CanalStater {
         if (!StringUtils.isEmpty(aliyunSecretKey)) {
             mqProperties.setAliyunSecretKey(aliyunSecretKey);
         }
-        String transaction = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_TRANSACTION);
-        if (!StringUtils.isEmpty(transaction)) {
-            mqProperties.setTransaction(Boolean.valueOf(transaction));
-        }
 
         String producerGroup = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_PRODUCERGROUP);
         if (!StringUtils.isEmpty(producerGroup)) {
             mqProperties.setProducerGroup(producerGroup);
         }
 
-        String enableMessageTrace = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ENABLE_MESSAGE_TRACE);
+        String enableMessageTrace = CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_ENABLE_MESSAGE_TRACE);
         if (!StringUtils.isEmpty(enableMessageTrace)) {
             mqProperties.setEnableMessageTrace(Boolean.valueOf(enableMessageTrace));
         }
@@ -216,7 +214,8 @@ public class CanalStater {
             mqProperties.setAccessChannel(accessChannel);
         }
 
-        String customizedTraceTopic = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CUSTOMIZED_TRACE_TOPIC);
+        String customizedTraceTopic = CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_CUSTOMIZED_TRACE_TOPIC);
         if (!StringUtils.isEmpty(customizedTraceTopic)) {
             mqProperties.setCustomizedTraceTopic(customizedTraceTopic);
         }
@@ -226,17 +225,20 @@ public class CanalStater {
             mqProperties.setNamespace(namespace);
         }
 
-        String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
+        String kafkaKerberosEnable = CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
         if (!StringUtils.isEmpty(kafkaKerberosEnable)) {
             mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable));
         }
 
-        String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
+        String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
         if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) {
             mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath);
         }
 
-        String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
+        String kafkaKerberosJaasFilepath = CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
         if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) {
             mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
         }

+ 0 - 2
deployer/src/main/resources/canal.properties

@@ -115,8 +115,6 @@ canal.mq.canalGetTimeout = 100
 canal.mq.flatMessage = true
 canal.mq.compressionType = none
 canal.mq.acks = all
-# use transaction for kafka flatMessage batch produce
-canal.mq.transaction = true
 #canal.mq.properties. =
 canal.mq.producerGroup = test
 # Set this value to "cloud", if you want open message trace feature in aliyun.

+ 15 - 39
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -25,15 +25,14 @@ public class MQProperties {
     private String     acks                   = "all";
     private String     aliyunAccessKey        = "";
     private String     aliyunSecretKey        = "";
-    private boolean    transaction            = false;           // 是否开启事务
     private Properties properties             = new Properties();
     private boolean    enableMessageTrace     = false;
     private String     accessChannel          = null;
     private String     customizedTraceTopic   = null;
     private String     namespace              = "";
-    private boolean    kerberosEnable         = false;           //kafka集群是否启动Kerberos认证
-    private String     kerberosKrb5FilePath   = "";              //启动Kerberos认证时配置为krb5.conf文件的路径
-    private String     kerberosJaasFilePath   = "";              //启动Kerberos认证时配置为jaas.conf文件的路径
+    private boolean    kerberosEnable         = false;           // kafka集群是否启动Kerberos认证
+    private String     kerberosKrb5FilePath   = "";              // 启动Kerberos认证时配置为krb5.conf文件的路径
+    private String     kerberosJaasFilePath   = "";              // 启动Kerberos认证时配置为jaas.conf文件的路径
 
     public static class CanalDestination {
 
@@ -213,14 +212,6 @@ public class MQProperties {
         this.maxRequestSize = maxRequestSize;
     }
 
-    public boolean getTransaction() {
-        return transaction;
-    }
-
-    public void setTransaction(boolean transaction) {
-        this.transaction = transaction;
-    }
-
     public Properties getProperties() {
         return properties;
     }
@@ -285,32 +276,17 @@ public class MQProperties {
         this.kerberosJaasFilePath = kerberosJaasFilePath;
     }
 
-    @Override public String toString() {
-        return "MQProperties{" +
-            "servers='" + servers + '\'' +
-            ", retries=" + retries +
-            ", batchSize=" + batchSize +
-            ", lingerMs=" + lingerMs +
-            ", maxRequestSize=" + maxRequestSize +
-            ", bufferMemory=" + bufferMemory +
-            ", filterTransactionEntry=" + filterTransactionEntry +
-            ", producerGroup='" + producerGroup + '\'' +
-            ", canalBatchSize=" + canalBatchSize +
-            ", canalGetTimeout=" + canalGetTimeout +
-            ", flatMessage=" + flatMessage +
-            ", compressionType='" + compressionType + '\'' +
-            ", acks='" + acks + '\'' +
-            ", aliyunAccessKey='" + aliyunAccessKey + '\'' +
-            ", aliyunSecretKey='" + aliyunSecretKey + '\'' +
-            ", transaction=" + transaction +
-            ", properties=" + properties +
-            ", enableMessageTrace=" + enableMessageTrace +
-            ", accessChannel='" + accessChannel + '\'' +
-            ", customizedTraceTopic='" + customizedTraceTopic + '\'' +
-            ", namespace='" + namespace + '\'' +
-            ", kerberosEnable='" + kerberosEnable + '\'' +
-            ", kerberosKrb5FilePath='" + kerberosKrb5FilePath + '\'' +
-            ", kerberosJaasFilePath='" + kerberosJaasFilePath + '\'' +
-            '}';
+    @Override
+    public String toString() {
+        return "MQProperties{" + "servers='" + servers + '\'' + ", retries=" + retries + ", batchSize=" + batchSize
+               + ", lingerMs=" + lingerMs + ", maxRequestSize=" + maxRequestSize + ", bufferMemory=" + bufferMemory
+               + ", filterTransactionEntry=" + filterTransactionEntry + ", producerGroup='" + producerGroup + '\''
+               + ", canalBatchSize=" + canalBatchSize + ", canalGetTimeout=" + canalGetTimeout + ", flatMessage="
+               + flatMessage + ", compressionType='" + compressionType + '\'' + ", acks='" + acks + '\''
+               + ", aliyunAccessKey='" + aliyunAccessKey + '\'' + ", aliyunSecretKey='" + aliyunSecretKey + '\''
+               + ", properties=" + properties + ", enableMessageTrace=" + enableMessageTrace + ", accessChannel='"
+               + accessChannel + '\'' + ", customizedTraceTopic='" + customizedTraceTopic + '\'' + ", namespace='"
+               + namespace + '\'' + ", kerberosEnable='" + kerberosEnable + '\'' + ", kerberosKrb5FilePath='"
+               + kerberosKrb5FilePath + '\'' + ", kerberosJaasFilePath='" + kerberosJaasFilePath + '\'' + '}';
     }
 }

+ 66 - 82
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,12 +1,13 @@
 package com.alibaba.otter.canal.kafka;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.MQMessageUtils;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -15,12 +16,13 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.MQMessageUtils;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 
 /**
  * kafka producer 主操作类
@@ -53,26 +55,18 @@ public class CanalKafkaProducer implements CanalMQProducer {
         if (!kafkaProperties.getProperties().isEmpty()) {
             properties.putAll(kafkaProperties.getProperties());
         }
-
-        if (kafkaProperties.getTransaction()) {
-            properties.put("transactional.id", "canal-transactional-id");
-        } else {
-            properties.put("retries", kafkaProperties.getRetries());
-        }
-
-        if (kafkaProperties.isKerberosEnable()){
+        properties.put("retries", kafkaProperties.getRetries());
+        if (kafkaProperties.isKerberosEnable()) {
             File krb5File = new File(kafkaProperties.getKerberosKrb5FilePath());
             File jaasFile = new File(kafkaProperties.getKerberosJaasFilePath());
-            if(krb5File.exists() && jaasFile.exists()){
-                //配置kerberos认证,需要使用绝对路径
-                System.setProperty("java.security.krb5.conf",
-                        krb5File.getAbsolutePath());
-                System.setProperty("java.security.auth.login.config",
-                        jaasFile.getAbsolutePath());
+            if (krb5File.exists() && jaasFile.exists()) {
+                // 配置kerberos认证,需要使用绝对路径
+                System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
+                System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
                 System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
                 properties.put("security.protocol", "SASL_PLAINTEXT");
                 properties.put("sasl.kerberos.service.name", "kafka");
-            }else{
+            } else {
                 String errorMsg = "ERROR # The kafka kerberos configuration file does not exist! please check it";
                 logger.error(errorMsg);
                 throw new RuntimeException(errorMsg);
@@ -86,13 +80,6 @@ public class CanalKafkaProducer implements CanalMQProducer {
             properties.put("value.serializer", StringSerializer.class.getName());
             producer2 = new KafkaProducer<String, String>(properties);
         }
-        if (kafkaProperties.getTransaction()) {
-            if (!kafkaProperties.getFlatMessage()) {
-                producer.initTransactions();
-            } else {
-                producer2.initTransactions();
-            }
-        }
     }
 
     @Override
@@ -114,18 +101,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
 
     @Override
     public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
-        // 开启事务,需要kafka版本支持
-        Producer producerTmp;
-        if (!kafkaProperties.getFlatMessage()) {
-            producerTmp = producer;
-        } else {
-            producerTmp = producer2;
-        }
-
         try {
-            if (kafkaProperties.getTransaction()) {
-                producerTmp.beginTransaction();
-            }
             if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
                 // 动态topic
                 Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
@@ -143,19 +119,9 @@ public class CanalKafkaProducer implements CanalMQProducer {
             } else {
                 send(canalDestination, canalDestination.getTopic(), message);
             }
-            if (kafkaProperties.getTransaction()) {
-                producerTmp.commitTransaction();
-            }
             callback.commit();
         } catch (Throwable e) {
             logger.error(e.getMessage(), e);
-            if (kafkaProperties.getTransaction()) {
-                try {
-                    producerTmp.abortTransaction();
-                } catch (Exception e1) {
-                    logger.error(e1.getMessage(), e1);
-                }
-            }
             callback.rollback();
         }
     }
@@ -163,7 +129,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
     private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
                                                                                                         throws Exception {
         if (!kafkaProperties.getFlatMessage()) {
-            List<ProducerRecord<String, Message>> records = new ArrayList<ProducerRecord<String, Message>>();
+            List<ProducerRecord> records = new ArrayList<ProducerRecord>();
             if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                 Message[] messages = MQMessageUtils.messagePartition(message,
                     canalDestination.getPartitionsNum(),
@@ -180,18 +146,11 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 records.add(new ProducerRecord<String, Message>(topicName, partition, null, message));
             }
 
-            if (!records.isEmpty()) {
-                for (ProducerRecord<String, Message> record : records) {
-                    producer.send(record).get();
-                }
-
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
-                }
-            }
+            produce(topicName, records, false);
         } else {
             // 发送扁平数据json
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
+            List<ProducerRecord> records = new ArrayList<ProducerRecord>();
             if (flatMessages != null) {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
@@ -202,34 +161,59 @@ public class CanalKafkaProducer implements CanalMQProducer {
                         for (int i = 0; i < length; i++) {
                             FlatMessage flatMessagePart = partitionFlatMessage[i];
                             if (flatMessagePart != null) {
-                                produce(topicName, i, flatMessagePart);
+                                records.add(new ProducerRecord<String, String>(topicName,
+                                    i,
+                                    null,
+                                    JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue)));
                             }
                         }
                     } else {
                         final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
-                        produce(topicName, partition, flatMessage);
+                        records.add(new ProducerRecord<String, String>(topicName,
+                            partition,
+                            null,
+                            JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue)));
                     }
 
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Send flat message to kafka topic: [{}], packet: {}",
-                            topicName,
-                            JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
-                    }
+                    // 每条记录需要flush
+                    produce(topicName, records, true);
                 }
             }
         }
     }
 
-    private void produce(String topicName, int partition, FlatMessage flatMessage) throws ExecutionException,
-                                                                                  InterruptedException {
-        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
-            partition,
-            null,
-            JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
-        if (kafkaProperties.getTransaction()) {
-            producer2.send(record);
+    private void produce(String topicName, List<ProducerRecord> records, boolean flatMessage) {
+
+        Producer producerTmp = null;
+        if (flatMessage) {
+            producerTmp = producer2;
         } else {
-            producer2.send(record).get();
+            producerTmp = producer;
+        }
+
+        List<Future> futures = new ArrayList<Future>();
+        try {
+            // 异步发送,因为在partition hash的时候已经按照每个分区合并了消息,走到这一步不需要考虑单个分区内的顺序问题
+            for (ProducerRecord record : records) {
+                futures.add(producerTmp.send(record));
+            }
+        } finally {
+            if (logger.isDebugEnabled()) {
+                for (ProducerRecord record : records) {
+                    logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, record.toString());
+                }
+            }
+            // 批量刷出
+            producerTmp.flush();
+
+            // flush操作也有可能是发送失败,这里需要异步关注一下发送结果,针对有异常的直接出发rollback
+            for (Future future : futures) {
+                try {
+                    future.get();
+                } catch (InterruptedException | ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
     }
 

+ 25 - 20
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -18,6 +18,7 @@ import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.CanalMessageSerializer;
@@ -29,7 +30,7 @@ import com.alibaba.otter.canal.spi.CanalMQProducer;
 
 public class CanalRocketMQProducer implements CanalMQProducer {
 
-    private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
+    private static final Logger logger               = LoggerFactory.getLogger(CanalRocketMQProducer.class);
     private DefaultMQProducer   defaultMQProducer;
     private MQProperties        mqProperties;
     private static final String CLOUD_ACCESS_CHANNEL = "cloud";
@@ -46,11 +47,14 @@ public class CanalRocketMQProducer implements CanalMQProducer {
             rpcHook = new AclClientRPCHook(sessionCredentials);
         }
 
-        defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(), rpcHook, mqProperties.isEnableMessageTrace(), mqProperties.getCustomizedTraceTopic());
-        if (CLOUD_ACCESS_CHANNEL.equals(rocketMQProperties.getAccessChannel())){
+        defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(),
+            rpcHook,
+            mqProperties.isEnableMessageTrace(),
+            mqProperties.getCustomizedTraceTopic());
+        if (CLOUD_ACCESS_CHANNEL.equals(rocketMQProperties.getAccessChannel())) {
             defaultMQProducer.setAccessChannel(AccessChannel.CLOUD);
         }
-        if (!StringUtils.isEmpty(mqProperties.getNamespace())){
+        if (!StringUtils.isEmpty(mqProperties.getNamespace())) {
             defaultMQProducer.setNamespace(mqProperties.getNamespace());
         }
         defaultMQProducer.setNamesrvAddr(rocketMQProperties.getServers());
@@ -88,22 +92,6 @@ public class CanalRocketMQProducer implements CanalMQProducer {
         }
     }
 
-    private void sendMessage(Message message, int partition) throws Exception{
-        SendResult sendResult = this.defaultMQProducer.send(message, new MessageQueueSelector() {
-            @Override
-            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                if (partition > mqs.size()) {
-                    return mqs.get(partition % mqs.size());
-                } else {
-                    return mqs.get(partition);
-                }
-            }
-        }, null);
-        if (logger.isDebugEnabled()) {
-            logger.debug("Send Message Result: {}", sendResult);
-        }
-    }
-
     public void send(final MQProperties.CanalDestination destination, String topicName,
                      com.alibaba.otter.canal.protocol.Message data) throws Exception {
         if (!mqProperties.getFlatMessage()) {
@@ -203,6 +191,23 @@ public class CanalRocketMQProducer implements CanalMQProducer {
         }
     }
 
+    private void sendMessage(Message message, int partition) throws Exception {
+        SendResult sendResult = this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+            @Override
+            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                if (partition > mqs.size()) {
+                    return mqs.get(partition % mqs.size());
+                } else {
+                    return mqs.get(partition);
+                }
+            }
+        }, null);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Send Message Result: {}", sendResult);
+        }
+    }
+
     @Override
     public void stop() {
         logger.info("## Stop RocketMQ producer##");

+ 2 - 1
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.instance.core.CanalInstance;
@@ -137,7 +138,7 @@ public class CanalMQStarter {
         }
 
         logger.info("## start the MQ producer: {}.", destination);
-
+        MDC.put("destination", destination);
         final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, "");
         while (running && destinationRunning.get()) {
             try {