Browse Source

增加kafka自定义properties

mcy 6 years ago
parent
commit
ad62275cd7

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -51,6 +51,7 @@ public class CanalConstants {
     public static final String CANAL_MQ_TRANSACTION              = ROOT + "." + "mq.transaction";
     public static final String CANAL_ALIYUN_ACCESSKEY            = ROOT + "." + "aliyun.accessKey";
     public static final String CANAL_ALIYUN_SECRETKEY            = ROOT + "." + "aliyun.secretKey";
+    public static final String CANAL_MQ_PROPERTIES               = ROOT + "." + "mq.properties";
 
     public static String getInstanceModeKey(String destination) {
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);

+ 11 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.deployer;
 
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.lang.StringUtils;
@@ -162,7 +163,16 @@ public class CanalStater {
         if (!StringUtils.isEmpty(transaction)) {
             mqProperties.setTransaction(Boolean.valueOf(transaction));
         }
+
+        for (Object key : properties.keySet()) {
+            key = StringUtils.trim(key.toString());
+            if (((String) key).startsWith(CanalConstants.CANAL_MQ_PROPERTIES)) {
+                String value = CanalController.getProperty(properties, (String) key);
+                String subKey = ((String) key).substring(CanalConstants.CANAL_MQ_PROPERTIES.length() + 1);
+                mqProperties.getProperties().put(subKey, value);
+            }
+        }
+
         return mqProperties;
     }
-
 }

+ 2 - 1
deployer/src/main/resources/canal.properties

@@ -116,4 +116,5 @@ canal.mq.flatMessage = true
 canal.mq.compressionType = none
 canal.mq.acks = all
 # use transaction for kafka flatMessage batch produce
-canal.mq.transaction = false
+canal.mq.transaction = false
+#canal.mq.properties. =

+ 27 - 16
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.common;
 
+import java.util.Properties;
+
 /**
  * kafka 配置项
  *
@@ -8,22 +10,23 @@ package com.alibaba.otter.canal.common;
  */
 public class MQProperties {
 
-    private String  servers                = "127.0.0.1:6667";
-    private int     retries                = 0;
-    private int     batchSize              = 16384;
-    private int     lingerMs               = 1;
-    private int     maxRequestSize         = 1048576;
-    private long    bufferMemory           = 33554432L;
-    private boolean filterTransactionEntry = true;
-    private String  producerGroup          = "Canal-Producer";
-    private int     canalBatchSize         = 50;
-    private Long    canalGetTimeout        = 100L;
-    private boolean flatMessage            = true;
-    private String  compressionType        = "none";
-    private String  acks                   = "all";
-    private String  aliyunAccessKey        = "";
-    private String  aliyunSecretKey        = "";
-    private boolean transaction            = false;           // 是否开启事务
+    private String     servers                = "127.0.0.1:6667";
+    private int        retries                = 0;
+    private int        batchSize              = 16384;
+    private int        lingerMs               = 1;
+    private int        maxRequestSize         = 1048576;
+    private long       bufferMemory           = 33554432L;
+    private boolean    filterTransactionEntry = true;
+    private String     producerGroup          = "Canal-Producer";
+    private int        canalBatchSize         = 50;
+    private Long       canalGetTimeout        = 100L;
+    private boolean    flatMessage            = true;
+    private String     compressionType        = "none";
+    private String     acks                   = "all";
+    private String     aliyunAccessKey        = "";
+    private String     aliyunSecretKey        = "";
+    private boolean    transaction            = false;           // 是否开启事务
+    private Properties properties             = new Properties();
 
     public static class CanalDestination {
 
@@ -210,4 +213,12 @@ public class MQProperties {
     public void setTransaction(boolean transaction) {
         this.transaction = transaction;
     }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
 }

+ 6 - 1
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -49,7 +49,12 @@ public class CanalKafkaProducer implements CanalMQProducer {
         properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
         properties.put("buffer.memory", kafkaProperties.getBufferMemory());
         properties.put("key.serializer", StringSerializer.class.getName());
-        if(kafkaProperties.getTransaction()){
+
+        if (!kafkaProperties.getProperties().isEmpty()) {
+            properties.putAll(kafkaProperties.getProperties());
+        }
+
+        if (kafkaProperties.getTransaction()) {
             properties.put("transactional.id", "canal-transactional-id");
         } else {
             properties.put("retries", kafkaProperties.getRetries());