Pārlūkot izejas kodu

Merge branch 'feature/mgr'

machey 6 gadi atpakaļ
vecāks
revīzija
100b43a5db

Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 20 - 0
deployer/manager_ddl.sql


+ 47 - 101
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -7,11 +7,7 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
-import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
-import com.alibaba.otter.canal.server.CanalMQStarter;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
+import com.alibaba.otter.canal.deployer.monitor.ManagerDbConfigMonitor;
 
 /**
  * canal独立版本启动的入口类
@@ -21,17 +17,20 @@ import com.alibaba.otter.canal.spi.CanalMQProducer;
  */
 public class CanalLauncher {
 
-    private static final String CLASSPATH_URL_PREFIX = "classpath:";
-    private static final Logger logger               = LoggerFactory.getLogger(CanalLauncher.class);
+    private static final String    CLASSPATH_URL_PREFIX = "classpath:";
+    private static final Logger    logger               = LoggerFactory.getLogger(CanalLauncher.class);
+    public static volatile boolean running              = false;
 
-    public static void main(String[] args) throws Throwable {
+    public static void main(String[] args) {
         try {
+            running = true;
             logger.info("## set default uncaught exception handler");
             setGlobalUncaughtExceptionHandler();
 
             logger.info("## load canal configurations");
             String conf = System.getProperty("canal.conf", "classpath:canal.properties");
             Properties properties = new Properties();
+            ManagerDbConfigMonitor managerDbConfigMonitor = null;
             if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                 conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                 properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
@@ -39,110 +38,56 @@ public class CanalLauncher {
                 properties.load(new FileInputStream(conf));
             }
 
-            CanalMQProducer canalMQProducer = null;
-            String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
-            if (serverMode.equalsIgnoreCase("kafka")) {
-                canalMQProducer = new CanalKafkaProducer();
-            } else if (serverMode.equalsIgnoreCase("rocketmq")) {
-                canalMQProducer = new CanalRocketMQProducer();
-            }
-
-            if (canalMQProducer != null) {
-                // disable netty
-                System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
-                System.setProperty(CanalConstants.CANAL_DESTINATIONS,
-                    properties.getProperty(CanalConstants.CANAL_DESTINATIONS));
+            String jdbcUrl = properties.getProperty("canal.manager.jdbc.url");
+            if (!StringUtils.isEmpty(jdbcUrl)) {
+                logger.info("## load remote canal configurations");
+                // load remote config
+                String jdbcUsername = properties.getProperty("canal.manager.jdbc.username");
+                String jdbcPassword = properties.getProperty("canal.manager.jdbc.password");
+                managerDbConfigMonitor = new ManagerDbConfigMonitor(jdbcUrl, jdbcUsername, jdbcPassword);
+                // 加载远程canal.properties
+                Properties remoteConfig = managerDbConfigMonitor.loadRemoteConfig();
+                // 加载remote instance配置
+                managerDbConfigMonitor.loadRemoteInstanceConfigs();
+                if (remoteConfig != null) {
+                    properties = remoteConfig;
+                } else {
+                    managerDbConfigMonitor = null;
+                }
+            } else {
+                logger.info("## load canal configurations");
             }
 
-            logger.info("## start the canal server.");
-            final CanalController controller = new CanalController(properties);
-            controller.start();
-            logger.info("## the canal server is running now ......");
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-
-                public void run() {
-                    try {
-                        logger.info("## stop the canal server");
-                        controller.stop();
-                    } catch (Throwable e) {
-                        logger.warn("##something goes wrong when stopping canal Server:", e);
-                    } finally {
-                        logger.info("## canal server is down.");
+            final CanalStater canalStater = new CanalStater();
+            canalStater.start(properties);
+
+            if (managerDbConfigMonitor != null) {
+                managerDbConfigMonitor.start(new ManagerDbConfigMonitor.Listener<Properties>() {
+
+                    @Override
+                    public void onChange(Properties properties) {
+                        try {
+                            // 远程配置canal.properties修改重新启动
+                            canalStater.destroy();
+                            canalStater.start(properties);
+                        } catch (Throwable throwable) {
+                            logger.error(throwable.getMessage(), throwable);
+                        }
                     }
-                }
+                });
+            }
 
-            });
+            while (running)
+                ;
 
-            if (canalMQProducer != null) {
-                CanalMQStarter canalMQStarter = new CanalMQStarter(canalMQProducer);
-                MQProperties mqProperties = buildMQPosition(properties);
-                canalMQStarter.start(mqProperties);
-                controller.setCanalMQStarter(canalMQStarter);
+            if (managerDbConfigMonitor != null) {
+                managerDbConfigMonitor.destroy();
             }
         } catch (Throwable e) {
             logger.error("## Something goes wrong when starting up the canal Server:", e);
-            System.exit(0);
         }
     }
 
-    private static MQProperties buildMQPosition(Properties properties) {
-        MQProperties mqProperties = new MQProperties();
-        String servers = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS);
-        if (!StringUtils.isEmpty(servers)) {
-            mqProperties.setServers(servers);
-        }
-        String retires = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_RETRIES);
-        if (!StringUtils.isEmpty(retires)) {
-            mqProperties.setRetries(Integer.valueOf(retires));
-        }
-        String batchSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BATCHSIZE);
-        if (!StringUtils.isEmpty(batchSize)) {
-            mqProperties.setBatchSize(Integer.valueOf(batchSize));
-        }
-        String lingerMs = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_LINGERMS);
-        if (!StringUtils.isEmpty(lingerMs)) {
-            mqProperties.setLingerMs(Integer.valueOf(lingerMs));
-        }
-        String maxRequestSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_MAXREQUESTSIZE);
-        if (!StringUtils.isEmpty(maxRequestSize)) {
-            mqProperties.setMaxRequestSize(Integer.valueOf(maxRequestSize));
-        }
-        String bufferMemory = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BUFFERMEMORY);
-        if (!StringUtils.isEmpty(bufferMemory)) {
-            mqProperties.setBufferMemory(Long.valueOf(bufferMemory));
-        }
-        String canalBatchSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALBATCHSIZE);
-        if (!StringUtils.isEmpty(canalBatchSize)) {
-            mqProperties.setCanalBatchSize(Integer.valueOf(canalBatchSize));
-        }
-        String canalGetTimeout = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALGETTIMEOUT);
-        if (!StringUtils.isEmpty(canalGetTimeout)) {
-            mqProperties.setCanalGetTimeout(Long.valueOf(canalGetTimeout));
-        }
-        String flatMessage = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_FLATMESSAGE);
-        if (!StringUtils.isEmpty(flatMessage)) {
-            mqProperties.setFlatMessage(Boolean.valueOf(flatMessage));
-        }
-        String compressionType = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_COMPRESSION_TYPE);
-        if (!StringUtils.isEmpty(compressionType)) {
-            mqProperties.setCompressionType(compressionType);
-        }
-        String acks = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ACKS);
-        if (!StringUtils.isEmpty(acks)) {
-            mqProperties.setAcks(acks);
-        }
-
-        String aliyunAccessKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESSKEY);
-        if (!StringUtils.isEmpty(aliyunAccessKey)) {
-            mqProperties.setAliyunAccessKey(aliyunAccessKey);
-        }
-        String aliyunSecretKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRETKEY);
-        if (!StringUtils.isEmpty(aliyunSecretKey)) {
-            mqProperties.setAliyunSecretKey(aliyunSecretKey);
-        }
-        return mqProperties;
-    }
-
     private static void setGlobalUncaughtExceptionHandler() {
         Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
 
@@ -152,4 +97,5 @@ public class CanalLauncher {
             }
         });
     }
+
 }

+ 142 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -0,0 +1,142 @@
+package com.alibaba.otter.canal.deployer;
+
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
+import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
+import com.alibaba.otter.canal.server.CanalMQStarter;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+
+public class CanalStater {
+
+    private static final Logger logger          = LoggerFactory.getLogger(CanalStater.class);
+
+    private CanalController     controller      = null;
+    private CanalMQProducer     canalMQProducer = null;
+    private Thread              shutdownThread  = null;
+    private CanalMQStarter      canalMQStarter  = null;
+
+    synchronized void start(Properties properties) throws Throwable {
+        String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
+        if (serverMode.equalsIgnoreCase("kafka")) {
+            canalMQProducer = new CanalKafkaProducer();
+        } else if (serverMode.equalsIgnoreCase("rocketmq")) {
+            canalMQProducer = new CanalRocketMQProducer();
+        }
+
+        if (canalMQProducer != null) {
+            // disable netty
+            System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
+            System.setProperty(CanalConstants.CANAL_DESTINATIONS,
+                properties.getProperty(CanalConstants.CANAL_DESTINATIONS));
+        }
+
+        logger.info("## start the canal server.");
+        controller = new CanalController(properties);
+        controller.start();
+        logger.info("## the canal server is running now ......");
+        shutdownThread = new Thread() {
+
+            public void run() {
+                try {
+                    logger.info("## stop the canal server");
+                    controller.stop();
+                    CanalLauncher.running = false;
+                } catch (Throwable e) {
+                    logger.warn("##something goes wrong when stopping canal Server:", e);
+                } finally {
+                    logger.info("## canal server is down.");
+                }
+            }
+
+        };
+        Runtime.getRuntime().addShutdownHook(shutdownThread);
+
+        if (canalMQProducer != null) {
+            canalMQStarter = new CanalMQStarter(canalMQProducer);
+            MQProperties mqProperties = buildMQPosition(properties);
+            canalMQStarter.start(mqProperties);
+            controller.setCanalMQStarter(canalMQStarter);
+        }
+    }
+
+    synchronized void destroy() throws Throwable {
+        if (controller != null) {
+            controller.stop();
+            controller = null;
+        }
+        if (shutdownThread != null) {
+            Runtime.getRuntime().removeShutdownHook(shutdownThread);
+            shutdownThread = null;
+        }
+        if (canalMQProducer != null && canalMQStarter != null) {
+            canalMQStarter.destroy();
+            canalMQStarter = null;
+            canalMQProducer = null;
+        }
+    }
+
+    private static MQProperties buildMQPosition(Properties properties) {
+        MQProperties mqProperties = new MQProperties();
+        String servers = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS);
+        if (!StringUtils.isEmpty(servers)) {
+            mqProperties.setServers(servers);
+        }
+        String retires = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_RETRIES);
+        if (!StringUtils.isEmpty(retires)) {
+            mqProperties.setRetries(Integer.valueOf(retires));
+        }
+        String batchSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BATCHSIZE);
+        if (!StringUtils.isEmpty(batchSize)) {
+            mqProperties.setBatchSize(Integer.valueOf(batchSize));
+        }
+        String lingerMs = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_LINGERMS);
+        if (!StringUtils.isEmpty(lingerMs)) {
+            mqProperties.setLingerMs(Integer.valueOf(lingerMs));
+        }
+        String maxRequestSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_MAXREQUESTSIZE);
+        if (!StringUtils.isEmpty(maxRequestSize)) {
+            mqProperties.setMaxRequestSize(Integer.valueOf(maxRequestSize));
+        }
+        String bufferMemory = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BUFFERMEMORY);
+        if (!StringUtils.isEmpty(bufferMemory)) {
+            mqProperties.setBufferMemory(Long.valueOf(bufferMemory));
+        }
+        String canalBatchSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALBATCHSIZE);
+        if (!StringUtils.isEmpty(canalBatchSize)) {
+            mqProperties.setCanalBatchSize(Integer.valueOf(canalBatchSize));
+        }
+        String canalGetTimeout = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALGETTIMEOUT);
+        if (!StringUtils.isEmpty(canalGetTimeout)) {
+            mqProperties.setCanalGetTimeout(Long.valueOf(canalGetTimeout));
+        }
+        String flatMessage = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_FLATMESSAGE);
+        if (!StringUtils.isEmpty(flatMessage)) {
+            mqProperties.setFlatMessage(Boolean.valueOf(flatMessage));
+        }
+        String compressionType = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_COMPRESSION_TYPE);
+        if (!StringUtils.isEmpty(compressionType)) {
+            mqProperties.setCompressionType(compressionType);
+        }
+        String acks = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ACKS);
+        if (!StringUtils.isEmpty(acks)) {
+            mqProperties.setAcks(acks);
+        }
+
+        String aliyunAccessKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESSKEY);
+        if (!StringUtils.isEmpty(aliyunAccessKey)) {
+            mqProperties.setAliyunAccessKey(aliyunAccessKey);
+        }
+        String aliyunSecretKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRETKEY);
+        if (!StringUtils.isEmpty(aliyunSecretKey)) {
+            mqProperties.setAliyunSecretKey(aliyunSecretKey);
+        }
+        return mqProperties;
+    }
+
+}

+ 308 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/ManagerDbConfigMonitor.java

@@ -0,0 +1,308 @@
+package com.alibaba.otter.canal.deployer.monitor;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.nio.charset.StandardCharsets;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.alibaba.otter.canal.deployer.CanalConstants;
+import com.google.common.base.Joiner;
+import com.google.common.collect.MapMaker;
+
+public class ManagerDbConfigMonitor {
+
+    private static final Logger      logger                 = LoggerFactory.getLogger(ManagerDbConfigMonitor.class);
+
+    private Map<String, ConfigItem>  remoteInstanceConfigs  = new MapMaker().makeMap();
+
+    private Connection               conn;
+    private String                   jdbcUrl;
+    private String                   jdbcUsername;
+    private String                   jdbcPassword;
+
+    private long                     currentConfigTimestamp = 0;
+
+    private long                     scanIntervalInSecond   = 5;
+    private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
+        new NamedThreadFactory("remote-canal-config-scan"));
+
+    public ManagerDbConfigMonitor(String jdbcUrl, String jdbcUsername, String jdbcPassword){
+        this.jdbcUrl = jdbcUrl;
+        this.jdbcUsername = jdbcUsername;
+        this.jdbcPassword = jdbcPassword;
+    }
+
+    public void setScanIntervalInSecond(long scanIntervalInSecond) {
+        this.scanIntervalInSecond = scanIntervalInSecond;
+    }
+
+    private Connection getConn() throws Exception {
+        if (conn == null || conn.isClosed()) {
+            Class.forName("com.mysql.jdbc.Driver");
+            conn = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword);
+        }
+        return conn;
+    }
+
+    public Properties loadRemoteConfig() {
+        Properties properties = null;
+        try {
+            // 加载远程canal配置
+            ConfigItem configItem = getRemoteCanalConfig();
+            if (configItem != null) {
+                if (configItem.getModifiedTime() != currentConfigTimestamp) {
+                    currentConfigTimestamp = configItem.getModifiedTime();
+                    overrideLocalCanalConfig(configItem.getContent());
+                    properties = new Properties();
+                    properties.load(new ByteArrayInputStream(configItem.getContent().getBytes(StandardCharsets.UTF_8)));
+                    scanIntervalInSecond = Integer
+                        .valueOf(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL, "5"));
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+        return properties;
+    }
+
+    public void loadRemoteInstanceConfigs() {
+        try {
+            // 加载远程instance配置
+            Map<String, ConfigItem>[] modifiedConfigs = getModifiedInstanceConfigs();
+            if (modifiedConfigs != null) {
+                overrideLocalInstanceConfigs(modifiedConfigs);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private ConfigItem getRemoteCanalConfig() {
+        String sql = "select name, content, modified_time from canal_config where id=1";
+        try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
+            if (rs.next()) {
+                ConfigItem configItem = new ConfigItem();
+                configItem.setId(1L);
+                configItem.setName(rs.getString("name"));
+                configItem.setContent(rs.getString("content"));
+                configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
+                return configItem;
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+        return null;
+    }
+
+    private void overrideLocalCanalConfig(String content) {
+        try (FileWriter writer = new FileWriter(getConfPath() + "canal.properties")) {
+            writer.write(content);
+            writer.flush();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public void start(final Listener<Properties> listener) {
+        // 监听canal.properties变化
+        executor.scheduleWithFixedDelay(new Runnable() {
+
+            public void run() {
+                try {
+                    Properties properties = loadRemoteConfig();
+                    if (properties != null) {
+                        // 重启整个进程
+                        listener.onChange(properties);
+                    }
+                } catch (Throwable e) {
+                    logger.error("scan failed", e);
+                }
+            }
+
+        }, 10, scanIntervalInSecond, TimeUnit.SECONDS);
+
+        // 监听instance变化
+        executor.scheduleWithFixedDelay(new Runnable() {
+
+            public void run() {
+                try {
+                    loadRemoteInstanceConfigs();
+                } catch (Throwable e) {
+                    logger.error("scan failed", e);
+                }
+            }
+
+        }, 10, 3, TimeUnit.SECONDS);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, ConfigItem>[] getModifiedInstanceConfigs() {
+        Map<String, ConfigItem>[] res = new Map[2];
+        Map<String, ConfigItem> remoteConfigStatus = new HashMap<>();
+        String sql = "select id, name, modified_time from canal_instance_config";
+        try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
+            while (rs.next()) {
+                ConfigItem configItem = new ConfigItem();
+                configItem.setId(rs.getLong("id"));
+                configItem.setName(rs.getString("name"));
+                configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
+                remoteConfigStatus.put(configItem.getName(), configItem);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return null;
+        }
+
+        if (!remoteConfigStatus.isEmpty()) {
+            List<Long> changedIds = new ArrayList<>();
+
+            for (ConfigItem remoteConfigStat : remoteConfigStatus.values()) {
+                ConfigItem currentConfig = remoteInstanceConfigs.get(remoteConfigStat.getName());
+                if (currentConfig == null) {
+                    // 新增
+                    changedIds.add(remoteConfigStat.getId());
+                } else {
+                    // 修改
+                    if (currentConfig.getModifiedTime() != remoteConfigStat.getModifiedTime()) {
+                        changedIds.add(remoteConfigStat.getId());
+                    }
+                }
+            }
+            if (!changedIds.isEmpty()) {
+                Map<String, ConfigItem> changedInstanceConfig = new HashMap<>();
+                String contentsSql = "select id, name, content, modified_time from canal_instance_config  where id in ("
+                                     + Joiner.on(",").join(changedIds) + ")";
+                try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(contentsSql)) {
+                    while (rs.next()) {
+                        ConfigItem configItemNew = new ConfigItem();
+                        configItemNew.setId(rs.getLong("id"));
+                        configItemNew.setName(rs.getString("name"));
+                        configItemNew.setContent(rs.getString("content"));
+                        configItemNew.setModifiedTime(rs.getTimestamp("modified_time").getTime());
+
+                        remoteInstanceConfigs.put(configItemNew.getName(), configItemNew);
+                        changedInstanceConfig.put(configItemNew.getName(), configItemNew);
+                    }
+
+                    res[0] = changedInstanceConfig.isEmpty() ? null : changedInstanceConfig;
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+
+        Map<String, ConfigItem> removedInstanceConfig = new HashMap<>();
+        for (String name : remoteInstanceConfigs.keySet()) {
+            if (!remoteConfigStatus.containsKey(name)) {
+                // 删除
+                removedInstanceConfig.put(name, null);
+            }
+        }
+        res[1] = removedInstanceConfig.isEmpty() ? null : removedInstanceConfig;
+
+        if (res[0] == null && res[1] == null) {
+            return null;
+        } else {
+            return res;
+        }
+    }
+
+    private void overrideLocalInstanceConfigs(Map<String, ConfigItem>[] modifiedInstanceConfigs) {
+        Map<String, ConfigItem> changedInstanceConfigs = modifiedInstanceConfigs[0];
+        if (changedInstanceConfigs != null) {
+            for (ConfigItem configItem : changedInstanceConfigs.values()) {
+                try (FileWriter writer = new FileWriter(
+                    getConfPath() + configItem.getName() + "/instance.properties")) {
+                    writer.write(configItem.getContent());
+                    writer.flush();
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+        Map<String, ConfigItem> removedInstanceConfigs = modifiedInstanceConfigs[1];
+        if (removedInstanceConfigs != null) {
+            for (String name : removedInstanceConfigs.keySet()) {
+                File file = new File(getConfPath() + name + "/");
+                if (file.exists()) {
+                    file.delete();
+                }
+            }
+        }
+    }
+
+    public void destroy() {
+        executor.shutdownNow();
+        if (conn != null) {
+            try {
+                conn.close();
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    private String getConfPath() {
+        String classpath = this.getClass().getResource("/").getPath();
+        String confPath = classpath + ".." + File.separator + "conf" + File.separator;
+        if (new File(confPath).exists()) {
+            return confPath;
+        } else {
+            return classpath;
+        }
+    }
+
+    public static class ConfigItem {
+
+        private Long   id;
+        private String name;
+        private String content;
+        private long   modifiedTime;
+
+        public Long getId() {
+            return id;
+        }
+
+        public void setId(Long id) {
+            this.id = id;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public String getContent() {
+            return content;
+        }
+
+        public void setContent(String content) {
+            this.content = content;
+        }
+
+        public long getModifiedTime() {
+            return modifiedTime;
+        }
+
+        public void setModifiedTime(long modifiedTime) {
+            this.modifiedTime = modifiedTime;
+        }
+    }
+
+    public interface Listener<Properties> {
+
+        void onChange(Properties properties);
+    }
+}

+ 3 - 0
deployer/src/main/resources/canal.properties

@@ -1,6 +1,9 @@
 #################################################
 ######### 		common argument		############# 
 #################################################
+#canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
+#canal.manager.jdbc.username=root
+#canal.manager.jdbc.password=121212
 canal.id = 1
 canal.ip =
 canal.port = 11111

+ 23 - 6
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -21,9 +21,9 @@ import com.alibaba.otter.canal.spi.CanalMQProducer;
 
 public class CanalMQStarter {
 
-    private static final Logger          logger       = LoggerFactory.getLogger(CanalMQStarter.class);
+    private static final Logger          logger         = LoggerFactory.getLogger(CanalMQStarter.class);
 
-    private volatile boolean             running      = false;
+    private volatile boolean             running        = false;
 
     private ExecutorService              executorService;
 
@@ -33,7 +33,9 @@ public class CanalMQStarter {
 
     private CanalServerWithEmbedded      canalServer;
 
-    private Map<String, CanalMQRunnable> canalMQWorks = new ConcurrentHashMap<>();
+    private Map<String, CanalMQRunnable> canalMQWorks   = new ConcurrentHashMap<>();
+
+    private static Thread                shutdownThread = null;
 
     public CanalMQStarter(CanalMQProducer canalMQProducer){
         this.canalMQProducer = canalMQProducer;
@@ -72,7 +74,8 @@ public class CanalMQStarter {
 
             running = true;
             logger.info("## the MQ workers is running now ......");
-            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+            shutdownThread = new Thread() {
 
                 public void run() {
                     try {
@@ -87,11 +90,25 @@ public class CanalMQStarter {
                     }
                 }
 
-            });
+            };
 
+            Runtime.getRuntime().addShutdownHook(shutdownThread);
         } catch (Throwable e) {
             logger.error("## Something goes wrong when starting up the canal MQ workers:", e);
-            System.exit(0);
+        }
+    }
+
+    public synchronized void destroy() {
+        running = false;
+        if (executorService != null) {
+            executorService.shutdown();
+        }
+        if (canalMQProducer != null) {
+            canalMQProducer.stop();
+        }
+        if (shutdownThread != null) {
+            Runtime.getRuntime().removeShutdownHook(shutdownThread);
+            shutdownThread = null;
         }
     }
 

Daži faili netika attēloti, jo izmaiņu fails ir pārāk liels