浏览代码

Merge pull request #1143 from rewerma/master

配置整理
agapple 6 年之前
父节点
当前提交
6f3cae5ddf

+ 10 - 10
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -17,13 +17,13 @@ public class CanalClientConfig {
 
     private String              zookeeperHosts;     // 集群模式下的zk地址, 如果配置了单机地址则以单机为准!!
 
-    private String              bootstrapServers;   // kafka or rocket mq 地址
+    private String              mqServers;          // kafka or rocket mq 地址
 
     private Boolean             flatMessage = true; // 是否已flatMessage模式传输, 只适用于mq模式
 
     private Integer             batchSize;          // 批大小
 
-    private Integer             retry;              // 重试次数
+    private Integer             retries;            // 重试次数
 
     private Long                timeout;            // 消费超时时间
 
@@ -47,12 +47,12 @@ public class CanalClientConfig {
         this.zookeeperHosts = zookeeperHosts;
     }
 
-    public String getBootstrapServers() {
-        return bootstrapServers;
+    public String getMqServers() {
+        return mqServers;
     }
 
-    public void setBootstrapServers(String bootstrapServers) {
-        this.bootstrapServers = bootstrapServers;
+    public void setMqServers(String mqServers) {
+        this.mqServers = mqServers;
     }
 
     public List<MQTopic> getMqTopics() {
@@ -79,12 +79,12 @@ public class CanalClientConfig {
         this.batchSize = batchSize;
     }
 
-    public Integer getRetry() {
-        return retry;
+    public Integer getRetries() {
+        return retries;
     }
 
-    public void setRetry(Integer retry) {
-        this.retry = retry;
+    public void setRetries(Integer retries) {
+        this.retries = retries;
     }
 
     public Long getTimeout() {

+ 0 - 3
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java

@@ -180,9 +180,6 @@ public class ExtensionLoader<T> {
 
     @SuppressWarnings("unchecked")
     private T createExtension(String name, String key) {
-        // System.out.println("xxxxxxxxxxxxx");
-        // getExtensionClasses().forEach((k, v) -> logger.info("fffff: " + k +
-        // " " + v.getName()));
         Class<?> clazz = getExtensionClasses().get(name);
         if (clazz == null) {
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type

+ 2 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -45,7 +45,8 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
         while (!running)
             ;
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
-        int retry = canalClientConfig.getRetry() == null ? 1 : canalClientConfig.getRetry();
+        int retry = canalClientConfig.getRetries() == null
+                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
 
         while (running) {

+ 2 - 4
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -33,8 +33,6 @@ public class CanalAdapterLoader {
 
     private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();
 
-    private Map<String, OuterAdapter>               outerAdapters = new HashMap<>();                                  // 配置文件对应adapter
-
     private ExtensionLoader<OuterAdapter>           loader;
 
     public CanalAdapterLoader(CanalClientConfig canalClientConfig){
@@ -97,7 +95,7 @@ public class CanalAdapterLoader {
                     canalOuterAdapterGroups.add(canalOuterAdapters);
                     if (StringUtils.isBlank(topic.getMqMode()) || "rocketmq".equalsIgnoreCase(topic.getMqMode())) {
                         CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig,
-                            canalClientConfig.getBootstrapServers(),
+                            canalClientConfig.getMqServers(),
                             topic.getTopic(),
                             group.getGroupId(),
                             canalOuterAdapterGroups,
@@ -106,7 +104,7 @@ public class CanalAdapterLoader {
                         rocketMQWorker.start();
                     } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
                         CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig,
-                            canalClientConfig.getBootstrapServers(),
+                            canalClientConfig.getMqServers(),
                             topic.getTopic(),
                             group.getGroupId(),
                             canalOuterAdapterGroups,

+ 2 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -43,7 +43,8 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
             ;
 
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
-        int retry = canalClientConfig.getRetry() == null ? 1 : canalClientConfig.getRetry();
+        int retry = canalClientConfig.getRetries() == null
+                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
 
         while (running) {

+ 2 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -64,7 +64,8 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
             ; // waiting until running == true
 
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
-        int retry = canalClientConfig.getRetry() == null ? 1 : canalClientConfig.getRetry();
+        int retry = canalClientConfig.getRetries() == null
+                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 300000 : canalClientConfig.getTimeout(); // 默认超时5分钟
         Integer batchSize = canalClientConfig.getBatchSize();
         if (batchSize == null) {

+ 8 - 6
client-adapter/launcher/src/main/resources/application.yml

@@ -14,18 +14,20 @@ spring:
 canal.conf:
   canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
-#  bootstrapServers: slave1:6667 #or rocketmq
+#  mqServers: slave1:6667 #or rocketmq
 #  flatMessage: true
+  retries: 0
+  timeout:
 #  srcDataSources:
 #    defaultDS:
 #      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
 #      username: root
 #      password: 121212
-  canalInstances:
-  - instance: example
-    groups:
-    - outAdapters:
-      - name: logger
+#  canalInstances:
+#  - instance: example
+#    groups:
+#    - outAdapters:
+#      - name: logger
 #      - name: rdb
 #        key: oracle1
 #        properties:

+ 40 - 17
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -85,23 +85,46 @@ public class CanalLauncher {
 
     private static MQProperties buildMQPosition(Properties properties) {
         MQProperties mqProperties = new MQProperties();
-        mqProperties.setServers(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS));
-        mqProperties.setRetries(Integer.valueOf(CanalController.getProperty(properties,
-            CanalConstants.CANAL_MQ_RETRIES)));
-        mqProperties.setBatchSize(Integer.valueOf(CanalController.getProperty(properties,
-            CanalConstants.CANAL_MQ_BATCHSIZE)));
-        mqProperties.setLingerMs(Integer.valueOf(CanalController.getProperty(properties,
-            CanalConstants.CANAL_MQ_LINGERMS)));
-        mqProperties.setBufferMemory(Long.valueOf(CanalController.getProperty(properties,
-            CanalConstants.CANAL_MQ_BUFFERMEMORY)));
-        mqProperties.setCanalBatchSize(Integer.valueOf(CanalController.getProperty(properties,
-            CanalConstants.CANAL_MQ_CANALBATCHSIZE)));
-        mqProperties.setCanalGetTimeout(Long.valueOf(CanalController.getProperty(properties,
-            CanalConstants.CANAL_MQ_CANALGETTIMEOUT)));
-        mqProperties.setFlatMessage(Boolean.valueOf(CanalController.getProperty(properties,
-            CanalConstants.CANAL_MQ_FLATMESSAGE)));
-        mqProperties.setCompressionType(CanalController.getProperty(properties,CanalConstants.CANAL_MQ_COMPRESSION_TYPE));
-        mqProperties.setAcks(CanalController.getProperty(properties,CanalConstants.CANAL_MQ_ACKS));
+        String servers = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS);
+        if (!StringUtils.isEmpty(servers)) {
+            mqProperties.setServers(servers);
+        }
+        String retires = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_RETRIES);
+        if (!StringUtils.isEmpty(retires)) {
+            mqProperties.setRetries(Integer.valueOf(retires));
+        }
+        String batchSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BATCHSIZE);
+        if (!StringUtils.isEmpty(batchSize)) {
+            mqProperties.setBatchSize(Integer.valueOf(batchSize));
+        }
+        String lingerMs = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_LINGERMS);
+        if (!StringUtils.isEmpty(lingerMs)) {
+            mqProperties.setLingerMs(Integer.valueOf(lingerMs));
+        }
+        String bufferMemory = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BUFFERMEMORY);
+        if (!StringUtils.isEmpty(bufferMemory)) {
+            mqProperties.setBufferMemory(Long.valueOf(bufferMemory));
+        }
+        String canalBatchSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALBATCHSIZE);
+        if (!StringUtils.isEmpty(canalBatchSize)) {
+            mqProperties.setCanalBatchSize(Integer.valueOf(canalBatchSize));
+        }
+        String canalGetTimeout = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALGETTIMEOUT);
+        if (!StringUtils.isEmpty(canalGetTimeout)) {
+            mqProperties.setCanalGetTimeout(Long.valueOf(canalGetTimeout));
+        }
+        String flatMessage = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_FLATMESSAGE);
+        if (!StringUtils.isEmpty(flatMessage)) {
+            mqProperties.setFlatMessage(Boolean.valueOf(flatMessage));
+        }
+        String compressionType = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_COMPRESSION_TYPE);
+        if (!StringUtils.isEmpty(compressionType)) {
+            mqProperties.setCompressionType(compressionType);
+        }
+        String acks = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ACKS);
+        if (!StringUtils.isEmpty(acks)) {
+            mqProperties.setAcks(acks);
+        }
         return mqProperties;
     }
 

+ 3 - 1
deployer/src/main/resources/canal.properties

@@ -108,4 +108,6 @@ canal.mq.lingerMs=1
 canal.mq.bufferMemory=33554432
 canal.mq.canalBatchSize=50
 canal.mq.canalGetTimeout=100
-canal.mq.flatMessage=true
+canal.mq.flatMessage=true
+canal.mq.compressionType=
+canal.mq.acks=