|
@@ -1,8 +1,12 @@
|
|
|
package com.alibaba.otter.canal.deployer;
|
|
|
|
|
|
-import java.util.Map;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileFilter;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
import java.util.Properties;
|
|
|
|
|
|
+import com.google.common.base.Joiner;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -12,6 +16,8 @@ import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
|
|
|
import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
|
|
|
import com.alibaba.otter.canal.server.CanalMQStarter;
|
|
|
import com.alibaba.otter.canal.spi.CanalMQProducer;
|
|
|
+import com.google.common.base.Function;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
|
|
|
/**
|
|
|
* Canal server 启动类
|
|
@@ -30,7 +36,7 @@ public class CanalStater {
|
|
|
|
|
|
/**
|
|
|
* 启动方法
|
|
|
- *
|
|
|
+ *
|
|
|
* @param properties canal.properties 配置
|
|
|
* @throws Throwable
|
|
|
*/
|
|
@@ -45,8 +51,38 @@ public class CanalStater {
|
|
|
if (canalMQProducer != null) {
|
|
|
// disable netty
|
|
|
System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
|
|
|
- System.setProperty(CanalConstants.CANAL_DESTINATIONS,
|
|
|
- properties.getProperty(CanalConstants.CANAL_DESTINATIONS));
|
|
|
+
|
|
|
+ String autoScan = CanalController.getProperty(properties, CanalConstants.CANAL_AUTO_SCAN);
|
|
|
+ if ("true".equals(autoScan)) {
|
|
|
+ String rootDir = CanalController.getProperty(properties, CanalConstants.CANAL_CONF_DIR);
|
|
|
+ if (StringUtils.isEmpty(rootDir)) {
|
|
|
+ rootDir = "../conf";
|
|
|
+ }
|
|
|
+ File rootdir = new File(rootDir);
|
|
|
+ if (rootdir.exists()) {
|
|
|
+ File[] instanceDirs = rootdir.listFiles(new FileFilter() {
|
|
|
+
|
|
|
+ public boolean accept(File pathname) {
|
|
|
+ String filename = pathname.getName();
|
|
|
+ return pathname.isDirectory() && !"spring".equalsIgnoreCase(filename);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ if (instanceDirs != null && instanceDirs.length > 0) {
|
|
|
+ List<String> instances = Lists.transform(Arrays.asList(instanceDirs),
|
|
|
+ new Function<File, String>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String apply(File instanceDir) {
|
|
|
+ return instanceDir.getName();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ System.setProperty(CanalConstants.CANAL_DESTINATIONS, Joiner.on(",").join(instances));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
|
|
|
+ System.setProperty(CanalConstants.CANAL_DESTINATIONS, destinations);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
logger.info("## start the canal server.");
|
|
@@ -80,7 +116,7 @@ public class CanalStater {
|
|
|
|
|
|
/**
|
|
|
* 销毁方法,远程配置变更时调用
|
|
|
- *
|
|
|
+ *
|
|
|
* @throws Throwable
|
|
|
*/
|
|
|
synchronized void destroy() throws Throwable {
|
|
@@ -101,7 +137,7 @@ public class CanalStater {
|
|
|
|
|
|
/**
|
|
|
* 构造MQ对应的配置
|
|
|
- *
|
|
|
+ *
|
|
|
* @param properties canal.properties 配置
|
|
|
* @return
|
|
|
*/
|