Explorar o código

bug fix (#2122)

* admin端口调整完成

* server端适配调整

* 调整部分功能

* 单机/集群admin调试完成

* 重新发布前端页面

* bug fix

* 解决server端admin配置会被远程覆盖的问题
解决default-instance.xml无法加载远程zkServers配置的问题

* 集群模式下取活动实例的问题
rewerma %!s(int64=6) %!d(string=hai) anos
pai
achega
274e91f8aa

+ 12 - 5
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/connector/AdminConnector.java

@@ -5,7 +5,7 @@ import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 
 /**
  * canal数据操作客户端
- * 
+ *
  * @author zebin.xuzb @ 2012-6-19
  * @author jianghang
  * @version 1.0.0
@@ -14,14 +14,14 @@ public interface AdminConnector {
 
     /**
      * 链接对应的canal server
-     * 
+     *
      * @throws CanalClientException
      */
     void connect() throws ServiceException;
 
     /**
      * 释放链接
-     * 
+     *
      * @throws CanalClientException
      */
     void disconnect() throws ServiceException;
@@ -54,6 +54,13 @@ public interface AdminConnector {
      */
     boolean restart();
 
+    /**
+     * 获取所有当前节点下所有实例
+     *
+     * @return 实例信息
+     */
+    String getInstances();
+
     /**
      * 获取所有当前节点下运行中的实例
      *
@@ -63,7 +70,7 @@ public interface AdminConnector {
 
     /**
      * 通过实例名检查
-     * 
+     *
      * @param destination
      * @return
      */
@@ -109,7 +116,7 @@ public interface AdminConnector {
 
     /**
      * 获取Instance的机器日志列表
-     * 
+     *
      * @param destination
      */
     String listInstanceLog(String destination);

+ 7 - 5
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/connector/SimpleAdminConnector.java

@@ -156,6 +156,11 @@ public class SimpleAdminConnector implements AdminConnector {
         return BooleanUtils.toBoolean(Integer.parseInt(doServerAdmin("restart")));
     }
 
+    @Override
+    public String getInstances() {
+        return doServerAdmin("instances");
+    }
+
     @Override
     public String getRunningInstances() {
         return doServerAdmin("list");
@@ -227,11 +232,8 @@ public class SimpleAdminConnector implements AdminConnector {
         try {
             writeWithHeader(Packet.newBuilder()
                 .setType(PacketType.INSTANCE)
-                .setBody(InstanceAdmin.newBuilder()
-                    .setDestination(destination)
-                    .setAction(action)
-                    .build()
-                    .toByteString())
+                .setBody(
+                    InstanceAdmin.newBuilder().setDestination(destination).setAction(action).build().toByteString())
                 .build()
                 .toByteArray());
 

+ 1 - 1
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/CanalInstanceController.java

@@ -162,6 +162,6 @@ public class CanalInstanceController {
      */
     @GetMapping(value = "/active/instances/{serverId}")
     public BaseModel<List<CanalInstanceConfig>> activeInstances(@PathVariable Long serverId, @PathVariable String env) {
-        return BaseModel.getInstance(canalInstanceConfigService.findActiveInstaceByServerId(serverId));
+        return BaseModel.getInstance(canalInstanceConfigService.findActiveInstanceByServerId(serverId));
     }
 }

+ 1 - 3
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/CanalInstanceService.java

@@ -20,8 +20,6 @@ public interface CanalInstanceService {
 
     CanalInstanceConfig detail(Long id);
 
-    CanalInstanceConfig findOne(String name);
-
     void updateContent(CanalInstanceConfig canalInstanceConfig);
 
     void delete(Long id);
@@ -32,5 +30,5 @@ public interface CanalInstanceService {
 
     boolean instanceOperation(Long id, String option);
 
-    List<CanalInstanceConfig> findActiveInstaceByServerId(Long serverId);
+    List<CanalInstanceConfig> findActiveInstanceByServerId(Long serverId);
 }

+ 8 - 1
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalClusterServiceImpl.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.admin.service.impl;
 import java.util.List;
 
 import com.alibaba.otter.canal.admin.common.exception.ServiceException;
+import com.alibaba.otter.canal.admin.model.CanalInstanceConfig;
 import com.alibaba.otter.canal.admin.model.NodeServer;
 import org.springframework.stereotype.Service;
 
@@ -33,7 +34,13 @@ public class CanalClusterServiceImpl implements CanalClusterServic {
             throw new ServiceException("Servers exist, delete failed");
         }
 
-        CanalCluster canalCluster = CanalCluster.find.byId(id);
+        // 判断集群下是否存在instance信息
+        int instanceCnt = CanalInstanceConfig.find.query().where().eq("clusterId", id).findCount();
+        if (instanceCnt > 0) {
+            throw new ServiceException("Instances exist, delete failed");
+        }
+
+      CanalCluster canalCluster = CanalCluster.find.byId(id);
         if (canalCluster != null) {
             canalCluster.delete();
         }

+ 13 - 13
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalInstanceServiceImpl.java

@@ -119,13 +119,23 @@ public class CanalInstanceServiceImpl implements CanalInstanceService {
      *
      * @param serverId server id
      */
-    public List<CanalInstanceConfig> findActiveInstaceByServerId(Long serverId) {
+    public List<CanalInstanceConfig> findActiveInstanceByServerId(Long serverId) {
         NodeServer nodeServer = NodeServer.find.byId(serverId);
         if (nodeServer == null) {
             return null;
         }
-        String runningInstances = SimpleAdminConnectors
-            .execute(nodeServer.getIp(), nodeServer.getAdminPort(), AdminConnector::getRunningInstances);
+
+      String runningInstances = null;
+        if (nodeServer.getClusterId() != null) {// 集群模式
+            // 只取活动的instances
+            runningInstances = SimpleAdminConnectors
+                .execute(nodeServer.getIp(), nodeServer.getAdminPort(), AdminConnector::getRunningInstances);
+        } else {
+            // 取所属所有instances
+            runningInstances = SimpleAdminConnectors
+                .execute(nodeServer.getIp(), nodeServer.getAdminPort(), AdminConnector::getInstances);
+        }
+
         if (runningInstances == null) {
             return null;
         }
@@ -227,16 +237,6 @@ public class CanalInstanceServiceImpl implements CanalInstanceService {
         }
     }
 
-    @Override
-    public CanalInstanceConfig findOne(String name) {
-        CanalInstanceConfig config = CanalInstanceConfig.find.query()
-            .setDisableLazyLoading(true)
-            .where()
-            .eq("name", name)
-            .findOne();
-        return config;
-    }
-
     public Map<String, String> remoteInstanceLog(Long id, Long nodeId) {
         Map<String, String> result = new HashMap<>();
 

+ 9 - 3
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -46,7 +46,7 @@ import com.google.common.collect.MigrateMap;
  */
 public class CanalController {
 
-    private static final Logger                      logger   = LoggerFactory.getLogger(CanalController.class);
+    private static final Logger                      logger           = LoggerFactory.getLogger(CanalController.class);
     private Long                                     cid;
     private String                                   ip;
     private String                                   registerIp;
@@ -54,10 +54,11 @@ public class CanalController {
     private int                                      adminPort;
     // 默认使用spring的方式载入
     private Map<String, InstanceConfig>              instanceConfigs;
+    private Map<String, String>                      runningInstances = new MapMaker().makeMap();
     private InstanceConfig                           globalInstanceConfig;
     private Map<String, PlainCanalConfigClient>      managerClients;
     // 监听instance config的变化
-    private boolean                                  autoScan = true;
+    private boolean                                  autoScan         = true;
     private InstanceAction                           defaultAction;
     private Map<InstanceMode, InstanceConfigMonitor> instanceConfigMonitors;
     private CanalServerWithEmbedded                  embededCanalServer;
@@ -169,6 +170,7 @@ public class CanalController {
                                 if (canalMQStarter != null) {
                                     canalMQStarter.startDestination(destination);
                                 }
+                                runningInstances.put(destination, destination);
                             } finally {
                                 MDC.remove(CanalConstants.MDC_DESTINATION);
                             }
@@ -181,6 +183,7 @@ public class CanalController {
                                     canalMQStarter.stopDestination(destination);
                                 }
                                 embededCanalServer.stop(destination);
+                                runningInstances.remove(destination);
                             } finally {
                                 MDC.remove(CanalConstants.MDC_DESTINATION);
                             }
@@ -363,7 +366,7 @@ public class CanalController {
                 }
 
                 if (config.getMode().isManager()) {
-                    PlainCanalInstanceGenerator instanceGenerator = new PlainCanalInstanceGenerator();
+                    PlainCanalInstanceGenerator instanceGenerator = new PlainCanalInstanceGenerator(properties);
                     instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
                     instanceGenerator.setSpringXml(config.getSpringXml());
                     return instanceGenerator.generate(destination);
@@ -592,4 +595,7 @@ public class CanalController {
         return instanceConfigs;
     }
 
+    public Map<String, String> getRunningInstances() {
+        return runningInstances;
+    }
 }

+ 39 - 30
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -49,45 +49,55 @@ public class CanalLauncher {
             if (StringUtils.isNotEmpty(managerAddress)) {
                 String user = properties.getProperty(CanalConstants.CANAL_ADMIN_USER);
                 String passwd = properties.getProperty(CanalConstants.CANAL_ADMIN_PASSWD);
-                String adminPort = properties.getProperty(CanalConstants.CANAL_ADMIN_PORT);
-                if (StringUtils.isEmpty(adminPort)) {
-                    adminPort = "11110";
-                }
+                String adminPort = properties.getProperty(CanalConstants.CANAL_ADMIN_PORT, "11110");
                 final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
                     user,
                     passwd,
                     "",
                     Integer.parseInt(adminPort));
                 PlainCanal canalConfig = configClient.findServer(null);
-                properties = canalConfig.getProperties();
-                int scanIntervalInSecond = Integer
-                    .valueOf(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL, "5"));
-                executor.scheduleWithFixedDelay(new Runnable() {
-
-                    private PlainCanal lastCanalConfig;
-
-                    public void run() {
-                        try {
-                            if (lastCanalConfig == null) {
-                                lastCanalConfig = configClient.findServer(null);
-                            } else {
-                                PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
-                                if (newCanalConfig != null) {
-                                    // 远程配置canal.properties修改重新加载整个应用
-                                    canalStater.stop();
-                                    canalStater.setProperties(newCanalConfig.getProperties());
-                                    canalStater.start();
-
-                                    lastCanalConfig = newCanalConfig;
+                if (canalConfig != null) {
+                    properties = canalConfig.getProperties();
+                    // 用本地配置覆盖
+                    properties.put(CanalConstants.CANAL_ADMIN_MANAGER, managerAddress);
+                    properties.put(CanalConstants.CANAL_ADMIN_USER, user);
+                    properties.put(CanalConstants.CANAL_ADMIN_PASSWD, passwd);
+                    properties.put(CanalConstants.CANAL_ADMIN_PORT, adminPort);
+                    int scanIntervalInSecond = Integer
+                        .parseInt(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL, "5"));
+                    executor.scheduleWithFixedDelay(new Runnable() {
+
+                        private PlainCanal lastCanalConfig;
+
+                        public void run() {
+                            try {
+                                if (lastCanalConfig == null) {
+                                    lastCanalConfig = configClient.findServer(null);
+                                } else {
+                                    PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
+                                    if (newCanalConfig != null) {
+                                        // 远程配置canal.properties修改重新加载整个应用
+                                        canalStater.stop();
+                                        Properties properties1 = newCanalConfig.getProperties();
+                                        // 用本地配置覆盖
+                                        properties1.put(CanalConstants.CANAL_ADMIN_MANAGER, managerAddress);
+                                        properties1.put(CanalConstants.CANAL_ADMIN_USER, user);
+                                        properties1.put(CanalConstants.CANAL_ADMIN_PASSWD, passwd);
+                                        properties1.put(CanalConstants.CANAL_ADMIN_PORT, adminPort);
+                                        canalStater.setProperties(properties1);
+                                        canalStater.start();
+
+                                        lastCanalConfig = newCanalConfig;
+                                    }
                                 }
-                            }
 
-                        } catch (Throwable e) {
-                            logger.error("scan failed", e);
+                            } catch (Throwable e) {
+                                logger.error("scan failed", e);
+                            }
                         }
-                    }
 
-                }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
+                    }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
+                }
             }
 
             canalStater.setProperties(properties);
@@ -108,5 +118,4 @@ public class CanalLauncher {
             }
         });
     }
-
 }

+ 18 - 2
deployer/src/main/java/com/alibaba/otter/canal/deployer/admin/CanalAdminController.java

@@ -28,7 +28,7 @@ import com.google.common.base.Joiner;
 
 /**
  * 提供canal admin的管理操作
- * 
+ *
  * @author agapple 2019年8月24日 下午11:39:01
  * @since 1.1.4
  */
@@ -103,7 +103,7 @@ public class CanalAdminController implements CanalAdmin {
     }
 
     @Override
-    public String getRunningInstances() {
+    public String getInstances() {
         try {
             CanalController controller = canalStater.getController();
             if (controller != null) {
@@ -118,6 +118,22 @@ public class CanalAdminController implements CanalAdmin {
         return "";
     }
 
+    @Override
+    public String getRunningInstances() {
+        try {
+            CanalController controller = canalStater.getController();
+            if (controller != null) {
+                Map<String, String> instanceConfigs = controller.getRunningInstances();
+                if (instanceConfigs != null) {
+                    return Joiner.on(",").join(instanceConfigs.keySet());
+                }
+            }
+        } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+        }
+        return "";
+    }
+
     @Override
     public boolean checkInstance(String destination) {
         Map<String, CanalInstance> instances = CanalServerWithEmbedded.instance().getCanalInstances();

+ 12 - 2
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/PlainCanalInstanceGenerator.java

@@ -13,9 +13,11 @@ import com.alibaba.otter.canal.instance.manager.plain.PlainCanal;
 import com.alibaba.otter.canal.instance.manager.plain.PlainCanalConfigClient;
 import com.alibaba.otter.canal.instance.spring.SpringCanalInstanceGenerator;
 
+import java.util.Properties;
+
 /**
  * 基于manager生成对应的{@linkplain CanalInstance}
- * 
+ *
  * @author jianghang 2012-7-12 下午05:37:09
  * @version 1.0.0
  */
@@ -26,6 +28,11 @@ public class PlainCanalInstanceGenerator implements CanalInstanceGenerator {
     private PlainCanalConfigClient canalConfigClient;
     private String                 defaultName = "instance";
     private BeanFactory            beanFactory;
+    private Properties             canalConfig;
+
+    public PlainCanalInstanceGenerator(Properties canalConfig){
+        this.canalConfig = canalConfig;
+    }
 
     public CanalInstance generate(String destination) {
         synchronized (CanalInstanceGenerator.class) {
@@ -34,8 +41,11 @@ public class PlainCanalInstanceGenerator implements CanalInstanceGenerator {
                 if (canal == null) {
                     throw new CanalException("instance : " + destination + " config is not found");
                 }
+                Properties properties = canal.getProperties();
+                properties.putAll(canalConfig);
                 // 设置动态properties,替换掉本地properties
-                com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer.propertiesLocal.set(canal.getProperties());
+                com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer.propertiesLocal
+                    .set(properties);
                 // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                 System.setProperty("canal.instance.destination", destination);
                 this.beanFactory = getBeanFactory(springXml);

+ 10 - 3
server/src/main/java/com/alibaba/otter/canal/admin/CanalAdmin.java

@@ -2,7 +2,7 @@ package com.alibaba.otter.canal.admin;
 
 /**
  * Canal Admin动态管理接口
- * 
+ *
  * @author agapple 2019年8月24日 下午9:45:49
  * @since 1.1.4
  */
@@ -41,6 +41,13 @@ public interface CanalAdmin {
      */
     boolean restart();
 
+    /**
+     * 获取所有当前节点下所有实例
+     *
+     * @return 实例信息
+     */
+    String getInstances();
+
     /**
      * 获取所有当前节点下运行中的实例
      *
@@ -50,7 +57,7 @@ public interface CanalAdmin {
 
     /**
      * 通过实例名检查
-     * 
+     *
      * @param destination
      * @return
      */
@@ -96,7 +103,7 @@ public interface CanalAdmin {
 
     /**
      * 获取Instance的机器日志列表
-     * 
+     *
      * @param destination
      */
     String listInstanceLog(String destination);

+ 8 - 3
server/src/main/java/com/alibaba/otter/canal/admin/handler/SessionHandler.java

@@ -58,6 +58,9 @@ public class SessionHandler extends SimpleChannelHandler {
                         case "list":
                             message = canalAdmin.getRunningInstances();
                             break;
+                        case "instances":
+                            message = canalAdmin.getInstances();
+                            break;
                         default:
                             byte[] errorBytes = AdminNettyUtils.errorPacket(301,
                                 MessageFormatter.format("ServerAdmin action={} is unknown", action).getMessage());
@@ -129,9 +132,11 @@ public class SessionHandler extends SimpleChannelHandler {
             }
         } catch (Throwable exception) {
             byte[] errorBytes = AdminNettyUtils.errorPacket(400,
-                MessageFormatter.format("something goes wrong with channel:{}, exception={}",
-                    ctx.getChannel(),
-                    ExceptionUtils.getStackTrace(exception)).getMessage());
+                MessageFormatter
+                    .format("something goes wrong with channel:{}, exception={}",
+                        ctx.getChannel(),
+                        ExceptionUtils.getStackTrace(exception))
+                    .getMessage());
             AdminNettyUtils.write(ctx.getChannel(), errorBytes);
         }
     }