|
@@ -3,11 +3,11 @@ package com.alibaba.otter.canal.deployer;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.util.Properties;
|
|
|
|
|
|
-import com.alibaba.otter.canal.common.MQProperties;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.alibaba.otter.canal.common.MQProperties;
|
|
|
import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
|
|
|
import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
|
|
|
import com.alibaba.otter.canal.server.CanalMQStarter;
|
|
@@ -73,22 +73,7 @@ public class CanalLauncher {
|
|
|
|
|
|
if (canalMQProducer != null) {
|
|
|
CanalMQStarter canalMQStarter = new CanalMQStarter(canalMQProducer);
|
|
|
- MQProperties mqProperties = new MQProperties();
|
|
|
- mqProperties.setServers(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS));
|
|
|
- mqProperties
|
|
|
- .setRetries(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_RETRIES)));
|
|
|
- mqProperties
|
|
|
- .setBatchSize(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BATCHSIZE)));
|
|
|
- mqProperties
|
|
|
- .setLingerMs(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_LINGERMS)));
|
|
|
- mqProperties.setBufferMemory(
|
|
|
- Long.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BUFFERMEMORY)));
|
|
|
- mqProperties.setCanalBatchSize(
|
|
|
- Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALBATCHSIZE)));
|
|
|
- mqProperties.setCanalGetTimeout(
|
|
|
- Long.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALGETTIMEOUT)));
|
|
|
- mqProperties.setFlatMessage(
|
|
|
- Boolean.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_FLATMESSAGE)));
|
|
|
+ MQProperties mqProperties = buildMQPosition(properties);
|
|
|
canalMQStarter.start(mqProperties);
|
|
|
controller.setCanalMQStarter(canalMQStarter);
|
|
|
}
|
|
@@ -98,6 +83,26 @@ public class CanalLauncher {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static MQProperties buildMQPosition(Properties properties) {
|
|
|
+ MQProperties mqProperties = new MQProperties();
|
|
|
+ mqProperties.setServers(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS));
|
|
|
+ mqProperties.setRetries(Integer.valueOf(CanalController.getProperty(properties,
|
|
|
+ CanalConstants.CANAL_MQ_RETRIES)));
|
|
|
+ mqProperties.setBatchSize(Integer.valueOf(CanalController.getProperty(properties,
|
|
|
+ CanalConstants.CANAL_MQ_BATCHSIZE)));
|
|
|
+ mqProperties.setLingerMs(Integer.valueOf(CanalController.getProperty(properties,
|
|
|
+ CanalConstants.CANAL_MQ_LINGERMS)));
|
|
|
+ mqProperties.setBufferMemory(Long.valueOf(CanalController.getProperty(properties,
|
|
|
+ CanalConstants.CANAL_MQ_BUFFERMEMORY)));
|
|
|
+ mqProperties.setCanalBatchSize(Integer.valueOf(CanalController.getProperty(properties,
|
|
|
+ CanalConstants.CANAL_MQ_CANALBATCHSIZE)));
|
|
|
+ mqProperties.setCanalGetTimeout(Long.valueOf(CanalController.getProperty(properties,
|
|
|
+ CanalConstants.CANAL_MQ_CANALGETTIMEOUT)));
|
|
|
+ mqProperties.setFlatMessage(Boolean.valueOf(CanalController.getProperty(properties,
|
|
|
+ CanalConstants.CANAL_MQ_FLATMESSAGE)));
|
|
|
+ return mqProperties;
|
|
|
+ }
|
|
|
+
|
|
|
private static void setGlobalUncaughtExceptionHandler() {
|
|
|
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
|
|
|
|