فهرست منبع

fixed issue #1940 #2088 , add canal.register.ip support docker

agapple 6 سال پیش
والد
کامیت
3a74f6124a

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -14,6 +14,7 @@ public class CanalConstants {
     public static final String ROOT                                 = "canal";
     public static final String CANAL_ID                             = ROOT + "." + "id";
     public static final String CANAL_IP                             = ROOT + "." + "ip";
+    public static final String CANAL_REGISTER_IP                    = ROOT + "." + "register.ip";
     public static final String CANAL_PORT                           = ROOT + "." + "port";
     public static final String CANAL_METRICS_PULL_PORT              = ROOT + "." + "metrics.pull.port";
     public static final String CANAL_ADMIN_JMX_PORT                 = ROOT + "." + "admin.jmx.port";

+ 85 - 78
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -54,6 +54,7 @@ public class CanalController {
     private static final Logger                      logger   = LoggerFactory.getLogger(CanalController.class);
     private Long                                     cid;
     private String                                   ip;
+    private String                                   registerIp;
     private int                                      port;
     // 默认使用spring的方式载入
     private Map<String, InstanceConfig>              instanceConfigs;
@@ -108,6 +109,7 @@ public class CanalController {
         // 准备canal server
         cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
         ip = getProperty(properties, CanalConstants.CANAL_IP);
+        registerIp = getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
         embededCanalServer = CanalServerWithEmbedded.instance();
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
@@ -127,9 +129,17 @@ public class CanalController {
         }
 
         // 处理下ip为空,默认使用hostIp暴露到zk中
+        if (StringUtils.isEmpty(ip) && StringUtils.isEmpty(registerIp)) {
+            ip = registerIp = AddressUtils.getHostIp();
+        }
+
         if (StringUtils.isEmpty(ip)) {
             ip = AddressUtils.getHostIp();
         }
+
+        if (StringUtils.isEmpty(registerIp)) {
+            registerIp = ip; // 兼容以前配置
+        }
         final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
         if (StringUtils.isNotEmpty(zkServers)) {
             zkclientx = ZkClientx.getZkClient(zkServers);
@@ -138,89 +148,88 @@ public class CanalController {
             zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
         }
 
-        final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
+        final ServerRunningData serverData = new ServerRunningData(cid, registerIp + ":" + port);
         ServerRunningMonitors.setServerData(serverData);
-        ServerRunningMonitors
-            .setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
-
-                public ServerRunningMonitor apply(final String destination) {
-                    ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
-                    runningMonitor.setDestination(destination);
-                    runningMonitor.setListener(new ServerRunningListener() {
-
-                        public void processActiveEnter() {
-                            try {
-                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                                embededCanalServer.start(destination);
-                                if (canalMQStarter != null) {
-                                    canalMQStarter.startDestination(destination);
-                                }
-                            } finally {
-                                MDC.remove(CanalConstants.MDC_DESTINATION);
+        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
+
+            public ServerRunningMonitor apply(final String destination) {
+                ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
+                runningMonitor.setDestination(destination);
+                runningMonitor.setListener(new ServerRunningListener() {
+
+                    public void processActiveEnter() {
+                        try {
+                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                            embededCanalServer.start(destination);
+                            if (canalMQStarter != null) {
+                                canalMQStarter.startDestination(destination);
                             }
+                        } finally {
+                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
+                    }
 
-                        public void processActiveExit() {
-                            try {
-                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                                if (canalMQStarter != null) {
-                                    canalMQStarter.stopDestination(destination);
-                                }
-                                embededCanalServer.stop(destination);
-                            } finally {
-                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                    public void processActiveExit() {
+                        try {
+                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                            if (canalMQStarter != null) {
+                                canalMQStarter.stopDestination(destination);
                             }
+                            embededCanalServer.stop(destination);
+                        } finally {
+                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
+                    }
 
-                        public void processStart() {
-                            try {
-                                if (zkclientx != null) {
-                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
-                                        ip + ":" + port);
-                                    initCid(path);
-                                    zkclientx.subscribeStateChanges(new IZkStateListener() {
-
-                                        public void handleStateChanged(KeeperState state) throws Exception {
-
-                                        }
-
-                                        public void handleNewSession() throws Exception {
-                                            initCid(path);
-                                        }
-
-                                        @Override
-                                        public void handleSessionEstablishmentError(Throwable error) throws Exception {
-                                            logger.error("failed to connect to zookeeper", error);
-                                        }
-                                    });
-                                }
-                            } finally {
-                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                    public void processStart() {
+                        try {
+                            if (zkclientx != null) {
+                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
+                                    registerIp + ":" + port);
+                                initCid(path);
+                                zkclientx.subscribeStateChanges(new IZkStateListener() {
+
+                                    public void handleStateChanged(KeeperState state) throws Exception {
+
+                                    }
+
+                                    public void handleNewSession() throws Exception {
+                                        initCid(path);
+                                    }
+
+                                    @Override
+                                    public void handleSessionEstablishmentError(Throwable error) throws Exception {
+                                        logger.error("failed to connect to zookeeper", error);
+                                    }
+                                });
                             }
+                        } finally {
+                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
+                    }
 
-                        public void processStop() {
-                            try {
-                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                                if (zkclientx != null) {
-                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
-                                        ip + ":" + port);
-                                    releaseCid(path);
-                                }
-                            } finally {
-                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                    public void processStop() {
+                        try {
+                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                            if (zkclientx != null) {
+                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
+                                    registerIp + ":" + port);
+                                releaseCid(path);
                             }
+                        } finally {
+                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
-
-                    });
-                    if (zkclientx != null) {
-                        runningMonitor.setZkClient(zkclientx);
                     }
-                    // 触发创建一下cid节点
-                    runningMonitor.init();
-                    return runningMonitor;
+
+                });
+                if (zkclientx != null) {
+                    runningMonitor.setZkClient(zkclientx);
                 }
-            }));
+                // 触发创建一下cid节点
+                runningMonitor.init();
+                return runningMonitor;
+            }
+        }));
 
         // 初始化monitor机制
         autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
@@ -266,8 +275,7 @@ public class CanalController {
             instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
 
                 public InstanceConfigMonitor apply(InstanceMode mode) {
-                    int scanInterval = Integer
-                        .valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
+                    int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
 
                     if (mode.isSpring()) {
                         SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
@@ -375,8 +383,7 @@ public class CanalController {
             InstanceConfig oldConfig = instanceConfigs.put(destination, config);
 
             if (oldConfig != null) {
-                logger
-                    .warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
+                logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
             }
         }
     }
@@ -424,9 +431,9 @@ public class CanalController {
     }
 
     public void start() throws Throwable {
-        logger.info("## start the canal server[{}:{}]", ip, port);
+        logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
         // 创建整个canal的工作节点
-        final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port);
+        final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
         initCid(path);
         if (zkclientx != null) {
             this.zkclientx.subscribeStateChanges(new IZkStateListener() {
@@ -501,14 +508,14 @@ public class CanalController {
         }
 
         // 释放canal的工作节点
-        releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port));
-        logger.info("## stop the canal server[{}:{}]", ip, port);
+        releaseCid(ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port));
+        logger.info("## stop the canal server[{}({}):{}]", ip, registerIp, port);
 
         if (zkclientx != null) {
             zkclientx.close();
         }
 
-        //关闭时清理缓存
+        // 关闭时清理缓存
         if (instanceConfigs != null) {
             instanceConfigs.clear();
         }

+ 10 - 5
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -4,16 +4,16 @@ import java.io.FileInputStream;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 
-import com.alibaba.otter.canal.deployer.mbean.CanalServerAgent;
-import com.alibaba.otter.canal.deployer.mbean.CanalServerMXBean;
-import com.alibaba.otter.canal.deployer.mbean.CanalServerBean;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.deployer.mbean.CanalServerAgent;
+import com.alibaba.otter.canal.deployer.mbean.CanalServerBean;
+import com.alibaba.otter.canal.deployer.mbean.CanalServerMXBean;
+import com.alibaba.otter.canal.deployer.monitor.remote.RemoteCanalConfigMonitor;
 import com.alibaba.otter.canal.deployer.monitor.remote.RemoteConfigLoader;
 import com.alibaba.otter.canal.deployer.monitor.remote.RemoteConfigLoaderFactory;
-import com.alibaba.otter.canal.deployer.monitor.remote.RemoteCanalConfigMonitor;
 
 /**
  * canal独立版本启动的入口类
@@ -80,8 +80,13 @@ public class CanalLauncher {
             String jmxPort = properties.getProperty(CanalConstants.CANAL_ADMIN_JMX_PORT);
             if (StringUtils.isNotEmpty(jmxPort)) {
                 String ip = properties.getProperty(CanalConstants.CANAL_IP);
+                String registerIp = properties.getProperty(CanalConstants.CANAL_REGISTER_IP);
+                if (StringUtils.isEmpty(registerIp)) {
+                    // 兼容老的配置
+                    registerIp = ip;
+                }
                 CanalServerMXBean canalServerMBean = new CanalServerBean(canalStater);
-                canalServerAgent = new CanalServerAgent(ip, Integer.parseInt(jmxPort), canalServerMBean);
+                canalServerAgent = new CanalServerAgent(registerIp, Integer.parseInt(jmxPort), canalServerMBean);
                 Thread agentThread = new Thread(canalServerAgent::start);
                 agentThread.start();
             }

+ 3 - 0
deployer/src/main/resources/canal.properties

@@ -5,7 +5,10 @@
 #canal.manager.jdbc.username=root
 #canal.manager.jdbc.password=121212
 canal.id = 1
+# tcp bind ip
 canal.ip =
+# register ip to zookeeper
+canal.register.ip =
 canal.port = 11111
 canal.metrics.pull.port = 11112
 canal.admin.jmx.port = 11113