Browse Source

升级rocketmq到4.8.0,并且支持环境变量优先配置 (#3450)

zoemak 4 years ago
parent
commit
32d9e58915

+ 32 - 0
common/src/main/java/com/alibaba/otter/canal/common/utils/PropertiesUtils.java

@@ -0,0 +1,32 @@
+package com.alibaba.otter.canal.common.utils;
+
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Properties;
+
+public class PropertiesUtils {
+    public static String getProperty(Properties properties, String key, String defaultValue) {
+        String value = getProperty(properties, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        } else {
+            return value;
+        }
+    }
+
+    public static String getProperty(Properties properties, String key) {
+        key = StringUtils.trim(key);
+        String value = System.getProperty(key);
+
+        if (value == null) {
+            value = System.getenv(key);
+        }
+
+        if (value == null) {
+            value = properties.getProperty(key);
+        }
+
+        return StringUtils.trim(value);
+    }
+}

+ 33 - 33
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/AbstractMQProducer.java

@@ -1,16 +1,16 @@
 package com.alibaba.otter.canal.connector.core.producer;
 
-import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang.StringUtils;
-
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import com.alibaba.otter.canal.connector.core.config.CanalConstants;
 import com.alibaba.otter.canal.connector.core.config.MQProperties;
 import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * MQ producer 抽象类
@@ -20,7 +20,7 @@ import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
  */
 public abstract class AbstractMQProducer implements CanalMQProducer {
 
-    protected MQProperties       mqProperties;
+    protected MQProperties mqProperties;
 
     protected ThreadPoolExecutor sendExecutor;
     protected ThreadPoolExecutor buildExecutor;
@@ -32,21 +32,21 @@ public abstract class AbstractMQProducer implements CanalMQProducer {
 
         int parallelBuildThreadSize = mqProperties.getParallelBuildThreadSize();
         buildExecutor = new ThreadPoolExecutor(parallelBuildThreadSize,
-            parallelBuildThreadSize,
-            0,
-            TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(parallelBuildThreadSize * 2),
-            new NamedThreadFactory("MQ-Parallel-Builder"),
-            new ThreadPoolExecutor.CallerRunsPolicy());
+                parallelBuildThreadSize,
+                0,
+                TimeUnit.SECONDS,
+                new ArrayBlockingQueue<>(parallelBuildThreadSize * 2),
+                new NamedThreadFactory("MQ-Parallel-Builder"),
+                new ThreadPoolExecutor.CallerRunsPolicy());
 
         int parallelSendThreadSize = mqProperties.getParallelSendThreadSize();
         sendExecutor = new ThreadPoolExecutor(parallelSendThreadSize,
-            parallelSendThreadSize,
-            0,
-            TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(parallelSendThreadSize * 2),
-            new NamedThreadFactory("MQ-Parallel-Sender"),
-            new ThreadPoolExecutor.CallerRunsPolicy());
+                parallelSendThreadSize,
+                0,
+                TimeUnit.SECONDS,
+                new ArrayBlockingQueue<>(parallelSendThreadSize * 2),
+                new NamedThreadFactory("MQ-Parallel-Sender"),
+                new ThreadPoolExecutor.CallerRunsPolicy());
     }
 
     @Override
@@ -77,52 +77,52 @@ public abstract class AbstractMQProducer implements CanalMQProducer {
      * canal.mq.timeout = 100 <br/>
      * canal.mq.access.channel = local <br/>
      * </p>
-     * 
+     *
      * @param properties 总配置对象
      */
     private void loadCanalMqProperties(Properties properties) {
-        String flatMessage = properties.getProperty(CanalConstants.CANAL_MQ_FLAT_MESSAGE);
+        String flatMessage = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_FLAT_MESSAGE);
         if (!StringUtils.isEmpty(flatMessage)) {
             mqProperties.setFlatMessage(Boolean.parseBoolean(flatMessage));
         }
 
-        String databaseHash = properties.getProperty(CanalConstants.CANAL_MQ_DATABASE_HASH);
+        String databaseHash = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_DATABASE_HASH);
         if (!StringUtils.isEmpty(databaseHash)) {
             mqProperties.setDatabaseHash(Boolean.parseBoolean(databaseHash));
         }
-        String filterTranEntry = properties.getProperty(CanalConstants.CANAL_FILTER_TRANSACTION_ENTRY);
+        String filterTranEntry = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_FILTER_TRANSACTION_ENTRY);
         if (!StringUtils.isEmpty(filterTranEntry)) {
             mqProperties.setFilterTransactionEntry(Boolean.parseBoolean(filterTranEntry));
         }
-        String parallelBuildThreadSize = properties.getProperty(CanalConstants.CANAL_MQ_BUILD_THREAD_SIZE);
+        String parallelBuildThreadSize = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_BUILD_THREAD_SIZE);
         if (!StringUtils.isEmpty(parallelBuildThreadSize)) {
             mqProperties.setParallelBuildThreadSize(Integer.parseInt(parallelBuildThreadSize));
         }
-        String parallelSendThreadSize = properties.getProperty(CanalConstants.CANAL_MQ_SEND_THREAD_SIZE);
+        String parallelSendThreadSize = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_SEND_THREAD_SIZE);
         if (!StringUtils.isEmpty(parallelSendThreadSize)) {
             mqProperties.setParallelSendThreadSize(Integer.parseInt(parallelSendThreadSize));
         }
-        String batchSize = properties.getProperty(CanalConstants.CANAL_MQ_CANAL_BATCH_SIZE);
+        String batchSize = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_CANAL_BATCH_SIZE);
         if (!StringUtils.isEmpty(batchSize)) {
             mqProperties.setBatchSize(Integer.parseInt(batchSize));
         }
-        String timeOut = properties.getProperty(CanalConstants.CANAL_MQ_CANAL_GET_TIMEOUT);
+        String timeOut = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_CANAL_GET_TIMEOUT);
         if (!StringUtils.isEmpty(timeOut)) {
             mqProperties.setFetchTimeout(Integer.parseInt(timeOut));
         }
-        String accessChannel = properties.getProperty(CanalConstants.CANAL_MQ_ACCESS_CHANNEL);
+        String accessChannel = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_ACCESS_CHANNEL);
         if (!StringUtils.isEmpty(accessChannel)) {
             mqProperties.setAccessChannel(accessChannel);
         }
-        String aliyunAccessKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_ACCESS_KEY);
+        String aliyunAccessKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESS_KEY);
         if (!StringUtils.isEmpty(aliyunAccessKey)) {
             mqProperties.setAliyunAccessKey(aliyunAccessKey);
         }
-        String aliyunSecretKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_SECRET_KEY);
+        String aliyunSecretKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRET_KEY);
         if (!StringUtils.isEmpty(aliyunSecretKey)) {
             mqProperties.setAliyunAccessKey(aliyunSecretKey);
         }
-        String aliyunUid = properties.getProperty(CanalConstants.CANAL_ALIYUN_UID);
+        String aliyunUid = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_UID);
         if (!StringUtils.isEmpty(aliyunUid)) {
             mqProperties.setAliyunUid(Integer.parseInt(aliyunUid));
         }
@@ -132,7 +132,7 @@ public abstract class AbstractMQProducer implements CanalMQProducer {
      * 兼容下<=1.1.4的mq配置项
      */
     protected void doMoreCompatibleConvert(String oldKey, String newKey, Properties properties) {
-        String value = properties.getProperty(oldKey);
+        String value = PropertiesUtils.getProperty(properties, oldKey);
         if (StringUtils.isNotEmpty(value)) {
             properties.setProperty(newKey, value);
         }

+ 4 - 3
connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java

@@ -9,6 +9,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -105,15 +106,15 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
                 kafkaProperties.put(key, value);
             }
         }
-        String kerberosEnabled = properties.getProperty(KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
+        String kerberosEnabled = PropertiesUtils.getProperty(properties, KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
         if (!StringUtils.isEmpty(kerberosEnabled)) {
             kafkaProducerConfig.setKerberosEnabled(Boolean.parseBoolean(kerberosEnabled));
         }
-        String krb5File = properties.getProperty(KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5_FILE);
+        String krb5File = PropertiesUtils.getProperty(properties, KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5_FILE);
         if (!StringUtils.isEmpty(krb5File)) {
             kafkaProducerConfig.setKrb5File(krb5File);
         }
-        String jaasFile = properties.getProperty(KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_JAAS_FILE);
+        String jaasFile = PropertiesUtils.getProperty(properties, KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_JAAS_FILE);
         if (!StringUtils.isEmpty(jaasFile)) {
             kafkaProducerConfig.setJaasFile(jaasFile);
         }

+ 7 - 6
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/consumer/CanalRabbitMQConsumer.java

@@ -8,6 +8,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,13 +67,13 @@ public class CanalRabbitMQConsumer implements CanalMsgConsumer {
 
     @Override
     public void init(Properties properties, String topic, String groupId) {
-        this.nameServer = properties.getProperty("rabbitmq.host");
-        this.vhost = properties.getProperty("rabbitmq.virtual.host");
+        this.nameServer = PropertiesUtils.getProperty(properties, "rabbitmq.host");
+        this.vhost = PropertiesUtils.getProperty(properties, "rabbitmq.virtual.host");
         this.queueName = topic;
-        this.accessKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_ACCESS_KEY);
-        this.secretKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_SECRET_KEY);
-        this.username = properties.getProperty(RabbitMQConstants.RABBITMQ_USERNAME);
-        this.password = properties.getProperty(RabbitMQConstants.RABBITMQ_PASSWORD);
+        this.accessKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESS_KEY);
+        this.secretKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRET_KEY);
+        this.username = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_USERNAME);
+        this.password = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_PASSWORD);
         Long resourceOwnerIdPro = (Long) properties.get(RabbitMQConstants.RABBITMQ_RESOURCE_OWNERID);
         if (resourceOwnerIdPro != null) {
             this.resourceOwnerId = resourceOwnerIdPro;

+ 6 - 5
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java

@@ -6,6 +6,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
+import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,23 +85,23 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
         // 兼容下<=1.1.4的mq配置
         doMoreCompatibleConvert("canal.mq.servers", "rabbitmq.host", properties);
 
-        String host = properties.getProperty(RabbitMQConstants.RABBITMQ_HOST);
+        String host = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_HOST);
         if (!StringUtils.isEmpty(host)) {
             rabbitMQProperties.setHost(host);
         }
-        String vhost = properties.getProperty(RabbitMQConstants.RABBITMQ_VIRTUAL_HOST);
+        String vhost = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_VIRTUAL_HOST);
         if (!StringUtils.isEmpty(vhost)) {
             rabbitMQProperties.setVirtualHost(vhost);
         }
-        String exchange = properties.getProperty(RabbitMQConstants.RABBITMQ_EXCHANGE);
+        String exchange = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_EXCHANGE);
         if (!StringUtils.isEmpty(exchange)) {
             rabbitMQProperties.setExchange(exchange);
         }
-        String username = properties.getProperty(RabbitMQConstants.RABBITMQ_USERNAME);
+        String username = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_USERNAME);
         if (!StringUtils.isEmpty(username)) {
             rabbitMQProperties.setUsername(username);
         }
-        String password = properties.getProperty(RabbitMQConstants.RABBITMQ_PASSWORD);
+        String password = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_PASSWORD);
         if (!StringUtils.isEmpty(password)) {
             rabbitMQProperties.setPassword(password);
         }

+ 1 - 1
connector/rocketmq-connector/pom.xml

@@ -16,7 +16,7 @@
     <name>canal connector rocketMQ module for otter ${project.version}</name>
 
     <properties>
-        <rocketmq_version>4.5.2</rocketmq_version>
+        <rocketmq_version>4.8.0</rocketmq_version>
     </properties>
 
     <dependencies>

+ 8 - 7
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java

@@ -6,6 +6,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -94,31 +95,31 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
         doMoreCompatibleConvert("canal.mq.namespace", "rocketmq.namespace", properties);
         doMoreCompatibleConvert("canal.mq.retries", "rocketmq.retry.times.when.send.failed", properties);
 
-        String producerGroup = properties.getProperty(RocketMQConstants.ROCKETMQ_PRODUCER_GROUP);
+        String producerGroup = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_PRODUCER_GROUP);
         if (!StringUtils.isEmpty(producerGroup)) {
             rocketMQProperties.setProducerGroup(producerGroup);
         }
-        String enableMessageTrace = properties.getProperty(RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
+        String enableMessageTrace = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
         if (!StringUtils.isEmpty(enableMessageTrace)) {
             rocketMQProperties.setEnableMessageTrace(Boolean.parseBoolean(enableMessageTrace));
         }
-        String customizedTraceTopic = properties.getProperty(RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
+        String customizedTraceTopic = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
         if (!StringUtils.isEmpty(customizedTraceTopic)) {
             rocketMQProperties.setCustomizedTraceTopic(customizedTraceTopic);
         }
-        String namespace = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESPACE);
+        String namespace = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_NAMESPACE);
         if (!StringUtils.isEmpty(namespace)) {
             rocketMQProperties.setNamespace(namespace);
         }
-        String namesrvAddr = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESRV_ADDR);
+        String namesrvAddr = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_NAMESRV_ADDR);
         if (!StringUtils.isEmpty(namesrvAddr)) {
             rocketMQProperties.setNamesrvAddr(namesrvAddr);
         }
-        String retry = properties.getProperty(RocketMQConstants.ROCKETMQ_RETRY_TIMES_WHEN_SEND_FAILED);
+        String retry = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_RETRY_TIMES_WHEN_SEND_FAILED);
         if (!StringUtils.isEmpty(retry)) {
             rocketMQProperties.setRetryTimesWhenSendFailed(Integer.parseInt(retry));
         }
-        String vipChannelEnabled = properties.getProperty(RocketMQConstants.ROCKETMQ_VIP_CHANNEL_ENABLED);
+        String vipChannelEnabled = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_VIP_CHANNEL_ENABLED);
         if (!StringUtils.isEmpty(vipChannelEnabled)) {
             rocketMQProperties.setVipChannelEnabled(Boolean.parseBoolean(vipChannelEnabled));
         }