agapple 5 rokov pred
rodič
commit
c76e8ba0e4
15 zmenil súbory, kde vykonal 143 pridanie a 76 odobranie
  1. 2 2
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/CanalClusterController.java
  2. 10 8
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/PollingConfigController.java
  3. 1 1
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/CanalClusterService.java
  4. 3 1
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/PollingConfigService.java
  5. 2 2
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalClusterServiceImpl.java
  6. 6 7
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/NodeServerServiceImpl.java
  7. 35 2
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/PollingConfigServiceImpl.java
  8. 3 1
      common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java
  9. 2 2
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java
  10. 31 13
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
  11. 4 3
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java
  12. 29 27
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/ManagerInstanceConfigMonitor.java
  13. 8 6
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/SpringInstanceConfigMonitor.java
  14. 6 0
      deployer/src/main/resources/canal.properties
  15. 1 1
      deployer/src/main/resources/canal_local.properties

+ 2 - 2
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/CanalClusterController.java

@@ -11,7 +11,7 @@ import com.alibaba.fastjson.JSONObject;
 import com.alibaba.otter.canal.admin.model.BaseModel;
 import com.alibaba.otter.canal.admin.model.CanalCluster;
 import com.alibaba.otter.canal.admin.model.NodeServer;
-import com.alibaba.otter.canal.admin.service.CanalClusterServic;
+import com.alibaba.otter.canal.admin.service.CanalClusterService;
 import com.alibaba.otter.canal.admin.service.NodeServerService;
 
 @RestController
@@ -19,7 +19,7 @@ import com.alibaba.otter.canal.admin.service.NodeServerService;
 public class CanalClusterController {
 
     @Autowired
-    CanalClusterServic canalClusterServic;
+    CanalClusterService canalClusterServic;
 
     @Autowired
     NodeServerService  nodeServerService;

+ 10 - 8
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/PollingConfigController.java

@@ -15,7 +15,7 @@ import org.springframework.web.bind.annotation.RestController;
 import com.alibaba.otter.canal.admin.model.BaseModel;
 import com.alibaba.otter.canal.admin.model.CanalConfig;
 import com.alibaba.otter.canal.admin.model.CanalInstanceConfig;
-import com.alibaba.otter.canal.admin.service.PollingConfigServer;
+import com.alibaba.otter.canal.admin.service.PollingConfigService;
 import com.alibaba.otter.canal.protocol.SecurityUtil;
 
 /**
@@ -37,7 +37,7 @@ public class PollingConfigController {
     String                      passwd;
 
     @Autowired
-    PollingConfigServer         pollingConfigServer;
+    PollingConfigService        pollingConfigService;
 
     /**
      * 获取server全局配置
@@ -45,16 +45,18 @@ public class PollingConfigController {
     @GetMapping(value = "/server_polling")
     public BaseModel<CanalConfig> canalConfigPoll(@RequestHeader String user, @RequestHeader String passwd,
                                                   @RequestParam String ip, @RequestParam Integer port,
-                                                  @RequestParam String md5, @PathVariable boolean register,
-                                                  @PathVariable String cluster, @PathVariable String env) {
+                                                  @RequestParam String md5, @RequestParam boolean register,
+                                                  @RequestParam String cluster, @PathVariable String env) {
         if (!auth(user, passwd)) {
             throw new RuntimeException("auth :" + user + " is failed");
         }
 
-        if (register) {
+        if (StringUtils.isEmpty(md5) && register) {
             // do something
+            pollingConfigService.autoRegister(ip, port, cluster);
         }
-        CanalConfig canalConfig = pollingConfigServer.getChangedConfig(ip, port, md5);
+
+        CanalConfig canalConfig = pollingConfigService.getChangedConfig(ip, port, md5);
         return BaseModel.getInstance(canalConfig);
     }
 
@@ -69,7 +71,7 @@ public class PollingConfigController {
             throw new RuntimeException("auth :" + user + " is failed");
         }
 
-        CanalInstanceConfig canalInstanceConfig = pollingConfigServer.getInstanceConfig(destination, md5);
+        CanalInstanceConfig canalInstanceConfig = pollingConfigService.getInstanceConfig(destination, md5);
         return BaseModel.getInstance(canalInstanceConfig);
     }
 
@@ -84,7 +86,7 @@ public class PollingConfigController {
             throw new RuntimeException("auth :" + user + " is failed");
         }
 
-        CanalInstanceConfig canalInstanceConfig = pollingConfigServer.getInstancesConfig(ip, port, md5);
+        CanalInstanceConfig canalInstanceConfig = pollingConfigService.getInstancesConfig(ip, port, md5);
         return BaseModel.getInstance(canalInstanceConfig);
     }
 

+ 1 - 1
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/CanalClusterServic.java → canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/CanalClusterService.java

@@ -4,7 +4,7 @@ import java.util.List;
 
 import com.alibaba.otter.canal.admin.model.CanalCluster;
 
-public interface CanalClusterServic {
+public interface CanalClusterService {
 
     void save(CanalCluster canalCluster);
 

+ 3 - 1
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/PollingConfigServer.java → canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/PollingConfigService.java

@@ -3,7 +3,9 @@ package com.alibaba.otter.canal.admin.service;
 import com.alibaba.otter.canal.admin.model.CanalConfig;
 import com.alibaba.otter.canal.admin.model.CanalInstanceConfig;
 
-public interface PollingConfigServer {
+public interface PollingConfigService {
+
+    public boolean autoRegister(String ip, Integer adminPort, String cluster);
 
     CanalConfig getChangedConfig(String ip, Integer port, String md5);
 

+ 2 - 2
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalClusterServiceImpl.java

@@ -10,10 +10,10 @@ import com.alibaba.otter.canal.admin.common.exception.ServiceException;
 import com.alibaba.otter.canal.admin.model.CanalCluster;
 import com.alibaba.otter.canal.admin.model.CanalInstanceConfig;
 import com.alibaba.otter.canal.admin.model.NodeServer;
-import com.alibaba.otter.canal.admin.service.CanalClusterServic;
+import com.alibaba.otter.canal.admin.service.CanalClusterService;
 
 @Service
-public class CanalClusterServiceImpl implements CanalClusterServic {
+public class CanalClusterServiceImpl implements CanalClusterService {
 
     public void save(CanalCluster canalCluster) {
         canalCluster.save();

+ 6 - 7
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/NodeServerServiceImpl.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.admin.service.impl;
 
+import io.ebean.Query;
+
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.List;
@@ -23,8 +25,6 @@ import com.alibaba.otter.canal.admin.model.Pager;
 import com.alibaba.otter.canal.admin.service.NodeServerService;
 import com.alibaba.otter.canal.protocol.SecurityUtil;
 
-import io.ebean.Query;
-
 /**
  * 节点信息业务层
  *
@@ -55,7 +55,6 @@ public class NodeServerServiceImpl implements NodeServerService {
                 String contentMd5 = SecurityUtil.md5String(canalConfig.getContent());
                 canalConfig.setContentMd5(contentMd5);
             } catch (NoSuchAlgorithmException e) {
-                e.printStackTrace();
             }
             canalConfig.save();
         }
@@ -176,8 +175,9 @@ public class NodeServerServiceImpl implements NodeServerService {
         if (nodeServer == null) {
             return "";
         }
-        return SimpleAdminConnectors
-            .execute(nodeServer.getIp(), nodeServer.getAdminPort(), adminConnector -> adminConnector.canalLog(100));
+        return SimpleAdminConnectors.execute(nodeServer.getIp(),
+            nodeServer.getAdminPort(),
+            adminConnector -> adminConnector.canalLog(100));
     }
 
     public boolean remoteOperation(Long id, String option) {
@@ -187,8 +187,7 @@ public class NodeServerServiceImpl implements NodeServerService {
         }
         Boolean result = null;
         if ("start".equals(option)) {
-            result = SimpleAdminConnectors
-                .execute(nodeServer.getIp(), nodeServer.getAdminPort(), AdminConnector::start);
+            result = SimpleAdminConnectors.execute(nodeServer.getIp(), nodeServer.getAdminPort(), AdminConnector::start);
         } else if ("stop".equals(option)) {
             result = SimpleAdminConnectors.execute(nodeServer.getIp(), nodeServer.getAdminPort(), AdminConnector::stop);
         } else {

+ 35 - 2
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/PollingConfigServiceImpl.java

@@ -5,18 +5,51 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import com.alibaba.otter.canal.admin.common.exception.ServiceException;
+import com.alibaba.otter.canal.admin.model.CanalCluster;
 import com.alibaba.otter.canal.admin.model.CanalConfig;
 import com.alibaba.otter.canal.admin.model.CanalInstanceConfig;
 import com.alibaba.otter.canal.admin.model.NodeServer;
-import com.alibaba.otter.canal.admin.service.PollingConfigServer;
+import com.alibaba.otter.canal.admin.service.CanalClusterService;
+import com.alibaba.otter.canal.admin.service.NodeServerService;
+import com.alibaba.otter.canal.admin.service.PollingConfigService;
 import com.alibaba.otter.canal.protocol.SecurityUtil;
 import com.google.common.base.Joiner;
 
 @Service
-public class PollingConfigServiceImpl implements PollingConfigServer {
+public class PollingConfigServiceImpl implements PollingConfigService {
+
+    @Autowired
+    NodeServerService   nodeServerService;
+
+    @Autowired
+    CanalClusterService canalClusterService;
+
+    public boolean autoRegister(String ip, Integer adminPort, String cluster) {
+        NodeServer server = NodeServer.find.query().where().eq("ip", ip).eq("adminPort", adminPort).findOne();
+        if (server == null) {
+            server = new NodeServer();
+            server.setName(ip);
+            server.setIp(ip);
+            server.setAdminPort(adminPort);
+            server.setTcpPort(adminPort + 1);
+            server.setMetricPort(adminPort + 2);
+            if (StringUtils.isNotEmpty(cluster)) {
+                CanalCluster clusterConfig = CanalCluster.find.query().where().eq("name", cluster).findOne();
+                if (clusterConfig == null) {
+                    throw new ServiceException("auto cluster : " + cluster + " is not found.");
+                }
+
+                server.setClusterId(clusterConfig.getId());
+            }
+            nodeServerService.save(server);
+        }
+
+        return true;
+    }
 
     public CanalConfig getChangedConfig(String ip, Integer port, String md5) {
         NodeServer server = NodeServer.find.query().where().eq("ip", ip).eq("adminPort", port).findOne();

+ 3 - 1
common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java

@@ -111,11 +111,13 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
 
     }
 
-    public void release() {
+    public boolean release() {
         if (zkClient != null) {
             releaseRunning(); // 尝试一下release
+            return true;
         } else {
             processActiveExit(); // 没有zk,直接退出
+            return false;
         }
     }
 

+ 2 - 2
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -23,8 +23,8 @@ public class CanalConstants {
     public static final String CANAL_ADMIN_PORT                     = ROOT + "." + "admin.port";
     public static final String CANAL_ADMIN_USER                     = ROOT + "." + "admin.user";
     public static final String CANAL_ADMIN_PASSWD                   = ROOT + "." + "admin.passwd";
-    public static final String CANAL_ADMIN_AUTO_REGISTER            = ROOT + "." + "admin.auto.register";
-    public static final String CANAL_ADMIN_AUTO_CLUSTER             = ROOT + "." + "admin.auto.cluster";
+    public static final String CANAL_ADMIN_AUTO_REGISTER            = ROOT + "." + "admin.register.auto";
+    public static final String CANAL_ADMIN_AUTO_CLUSTER             = ROOT + "." + "admin.register.cluster";
     public static final String CANAL_ZKSERVERS                      = ROOT + "." + "zkServers";
     public static final String CANAL_WITHOUT_NETTY                  = ROOT + "." + "withoutNetty";
 

+ 31 - 13
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -106,17 +106,12 @@ public class CanalController {
         // 准备canal server
         ip = getProperty(properties, CanalConstants.CANAL_IP);
         registerIp = getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
-        port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
-        adminPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_ADMIN_PORT));
+        port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT, "11111"));
+        adminPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110"));
         embededCanalServer = CanalServerWithEmbedded.instance();
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
-        try {
-            int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT));
-            embededCanalServer.setMetricsPort(metricsPort);
-        } catch (NumberFormatException e) {
-            logger.info("No valid metrics server port found, use default 11112.");
-            embededCanalServer.setMetricsPort(11112);
-        }
+        int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
+        embededCanalServer.setMetricsPort(metricsPort);
 
         this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
         this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
@@ -286,7 +281,21 @@ public class CanalController {
                     if (config != null) {
                         ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                         if (runningMonitor.isStart()) {
-                            runningMonitor.release();
+                            boolean release = runningMonitor.release();
+                            if (!release) {
+                                // 如果是单机模式,则直接清除配置
+                                instanceConfigs.remove(destination);
+                                // 停掉服务
+                                runningMonitor.stop();
+                                if (instanceConfigMonitors.containsKey(InstanceConfig.InstanceMode.MANAGER)) {
+                                    ManagerInstanceConfigMonitor monitor = (ManagerInstanceConfigMonitor) instanceConfigMonitors.get(InstanceConfig.InstanceMode.MANAGER);
+                                    Map<String, InstanceAction> instanceActions = monitor.getActions();
+                                    if (instanceActions.containsKey(destination)) {
+                                        // 清除内存中的autoScan cache
+                                        monitor.release(destination);
+                                    }
+                                }
+                            }
                         }
                     }
 
@@ -297,7 +306,9 @@ public class CanalController {
             instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
 
                 public InstanceConfigMonitor apply(InstanceMode mode) {
-                    int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
+                    int scanInterval = Integer.valueOf(getProperty(properties,
+                        CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
+                        "5"));
 
                     if (mode.isSpring()) {
                         SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
@@ -322,8 +333,6 @@ public class CanalController {
                         monitor.setDefaultAction(defaultAction);
                         String managerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
                         monitor.setConfigClient(getManagerClient(managerAddress));
-                        monitor.setIp(registerIp);
-                        monitor.setPort(adminPort);
                         return monitor;
                     } else {
                         throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
@@ -444,6 +453,15 @@ public class CanalController {
         return config;
     }
 
+    public static String getProperty(Properties properties, String key, String defaultValue) {
+        String value = getProperty(properties, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        } else {
+            return value;
+        }
+    }
+
     public static String getProperty(Properties properties, String key) {
         key = StringUtils.trim(key);
         String value = System.getProperty(key);

+ 4 - 3
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -51,7 +51,7 @@ public class CanalLauncher {
             if (StringUtils.isNotEmpty(managerAddress)) {
                 String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
                 String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
-                String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
+                String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");
                 boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
                     CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
                 String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
@@ -75,8 +75,9 @@ public class CanalLauncher {
                 Properties managerProperties = canalConfig.getProperties();
                 // merge local
                 managerProperties.putAll(properties);
-                int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(properties,
-                    CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
+                int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
+                    CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
+                    "5"));
                 executor.scheduleWithFixedDelay(new Runnable() {
 
                     private PlainCanal lastCanalConfig;

+ 29 - 27
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/ManagerInstanceConfigMonitor.java

@@ -28,26 +28,21 @@ import com.google.common.collect.MigrateMap;
  */
 public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle implements InstanceConfigMonitor, CanalLifeCycle {
 
-    private static final Logger         logger               = LoggerFactory
-        .getLogger(ManagerInstanceConfigMonitor.class);
+    private static final Logger         logger               = LoggerFactory.getLogger(ManagerInstanceConfigMonitor.class);
     private long                        scanIntervalInSecond = 5;
     private InstanceAction              defaultAction        = null;
     private Map<String, InstanceAction> actions              = new MapMaker().makeMap();
-    private Map<String, PlainCanal>     configs              = MigrateMap
-        .makeComputingMap(new Function<String, PlainCanal>() {
+    private Map<String, PlainCanal>     configs              = MigrateMap.makeComputingMap(new Function<String, PlainCanal>() {
 
-                                                                     public PlainCanal apply(String destination) {
-                                                                         return new PlainCanal();
-                                                                     }
-                                                                 });
+                                                                 public PlainCanal apply(String destination) {
+                                                                     return new PlainCanal();
+                                                                 }
+                                                             });
     private ScheduledExecutorService    executor             = Executors.newScheduledThreadPool(1,
-        new NamedThreadFactory("canal-instance-scan"));
+                                                                 new NamedThreadFactory("canal-instance-scan"));
 
     private volatile boolean            isFirst              = true;
     private PlainCanalConfigClient      configClient;
-    private String                      ip;
-    private int                         port;
-    private String                      lastInstanceMD5;
 
     public void start() {
         super.start();
@@ -86,7 +81,7 @@ public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle impleme
     }
 
     private void scan() {
-        String instances = configClient.findInstances(lastInstanceMD5);
+        String instances = configClient.findInstances(null);
         final List<String> is = Lists.newArrayList(StringUtils.split(instances, ','));
         List<String> start = Lists.newArrayList();
         List<String> stop = Lists.newArrayList();
@@ -117,7 +112,6 @@ public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle impleme
 
         stop.forEach(instance -> {
             notifyStop(instance);
-            configs.remove(instance);
         });
 
         restart.forEach(instance -> {
@@ -127,6 +121,7 @@ public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle impleme
         start.forEach(instance -> {
             notifyStart(instance);
         });
+
     }
 
     private void notifyStart(String destination) {
@@ -141,11 +136,14 @@ public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle impleme
 
     private void notifyStop(String destination) {
         InstanceAction action = actions.remove(destination);
-        try {
-            action.stop(destination);
-        } catch (Throwable e) {
-            logger.error(String.format("scan delete found[%s] but stop failed", destination), e);
-            actions.put(destination, action);// 再重新加回去,下一次scan时再执行删除
+        if (action != null) {
+            try {
+                action.stop(destination);
+                configs.remove(destination);
+            } catch (Throwable e) {
+                logger.error(String.format("scan delete found[%s] but stop failed", destination), e);
+                actions.put(destination, action);// 再重新加回去,下一次scan时再执行删除
+            }
         }
     }
 
@@ -160,6 +158,18 @@ public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle impleme
         }
     }
 
+    public void release(String destination) {
+        InstanceAction action = actions.remove(destination);
+        if (action != null) {
+            try {
+                configs.remove(destination);
+            } catch (Throwable e) {
+                logger.error(String.format("scan delete found[%s] but stop failed", destination), e);
+                actions.put(destination, action);// 再重新加回去,下一次scan时再执行删除
+            }
+        }
+    }
+
     public void setDefaultAction(InstanceAction defaultAction) {
         this.defaultAction = defaultAction;
     }
@@ -172,14 +182,6 @@ public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle impleme
         this.configClient = configClient;
     }
 
-    public void setIp(String ip) {
-        this.ip = ip;
-    }
-
-    public void setPort(int port) {
-        this.port = port;
-    }
-
     public Map<String, InstanceAction> getActions() {
         return actions;
     }

+ 8 - 6
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/SpringInstanceConfigMonitor.java

@@ -193,12 +193,14 @@ public class SpringInstanceConfigMonitor extends AbstractCanalLifeCycle implemen
 
     private void notifyStop(String destination) {
         InstanceAction action = actions.remove(destination);
-        try {
-            action.stop(destination);
-            lastFiles.remove(destination);
-        } catch (Throwable e) {
-            logger.error(String.format("scan delete found[%s] but stop failed", destination), e);
-            actions.put(destination, action);// 再重新加回去,下一次scan时再执行删除
+        if (action != null) {
+            try {
+                action.stop(destination);
+                lastFiles.remove(destination);
+            } catch (Throwable e) {
+                logger.error(String.format("scan delete found[%s] but stop failed", destination), e);
+                actions.put(destination, action);// 再重新加回去,下一次scan时再执行删除
+            }
         }
     }
 

+ 6 - 0
deployer/src/main/resources/canal.properties

@@ -11,6 +11,12 @@ canal.metrics.pull.port = 11112
 # canal.user = canal
 # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
 
+# canal admin config
+#canal.admin.manager = 127.0.0.1:8089
+canal.admin.port = 11110
+canal.admin.user = admin
+canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
+
 canal.zkServers =
 # flush data to zk
 canal.zookeeper.flush.period = 1000

+ 1 - 1
deployer/src/main/resources/canal_local.properties

@@ -8,4 +8,4 @@ canal.admin.user = admin
 canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
 # admin auto register
 canal.admin.register.auto = true
-canal.admin.register.cluster =
+canal.admin.register.cluster =