Quellcode durchsuchen

add autoRegister and cluster

agapple vor 5 Jahren
Ursprung
Commit
456945fe3a

+ 13 - 4
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/PollingConfigController.java

@@ -5,7 +5,12 @@ import java.security.NoSuchAlgorithmException;
 import org.apache.commons.lang.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestHeader;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
 
 import com.alibaba.otter.canal.admin.model.BaseModel;
 import com.alibaba.otter.canal.admin.model.CanalConfig;
@@ -40,11 +45,15 @@ public class PollingConfigController {
     @GetMapping(value = "/server_polling")
     public BaseModel<CanalConfig> canalConfigPoll(@RequestHeader String user, @RequestHeader String passwd,
                                                   @RequestParam String ip, @RequestParam Integer port,
-                                                  @RequestParam String md5, @PathVariable String env) {
+                                                  @RequestParam String md5, @PathVariable boolean register,
+                                                  @PathVariable String cluster, @PathVariable String env) {
         if (!auth(user, passwd)) {
             throw new RuntimeException("auth :" + user + " is failed");
         }
 
+        if (register) {
+            // do something
+        }
         CanalConfig canalConfig = pollingConfigServer.getChangedConfig(ip, port, md5);
         return BaseModel.getInstance(canalConfig);
     }
@@ -54,8 +63,8 @@ public class PollingConfigController {
      */
     @GetMapping(value = "/instance_polling/{destination}")
     public BaseModel<CanalInstanceConfig> instanceConfigPoll(@RequestHeader String user, @RequestHeader String passwd,
-                                                             @PathVariable String env, @PathVariable String destination,
-                                                             @RequestParam String md5) {
+                                                             @PathVariable String env,
+                                                             @PathVariable String destination, @RequestParam String md5) {
         if (!auth(user, passwd)) {
             throw new RuntimeException("auth :" + user + " is failed");
         }

+ 2 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -23,6 +23,8 @@ public class CanalConstants {
     public static final String CANAL_ADMIN_PORT                     = ROOT + "." + "admin.port";
     public static final String CANAL_ADMIN_USER                     = ROOT + "." + "admin.user";
     public static final String CANAL_ADMIN_PASSWD                   = ROOT + "." + "admin.passwd";
+    public static final String CANAL_ADMIN_AUTO_REGISTER            = ROOT + "." + "admin.auto.register";
+    public static final String CANAL_ADMIN_AUTO_CLUSTER             = ROOT + "." + "admin.auto.cluster";
     public static final String CANAL_ZKSERVERS                      = ROOT + "." + "zkServers";
     public static final String CANAL_WITHOUT_NETTY                  = ROOT + "." + "withoutNetty";
 

+ 0 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -47,7 +47,6 @@ import com.google.common.collect.MigrateMap;
 public class CanalController {
 
     private static final Logger                      logger   = LoggerFactory.getLogger(CanalController.class);
-    private Long                                     cid;
     private String                                   ip;
     private String                                   registerIp;
     private int                                      port;

+ 14 - 8
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -7,6 +7,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,12 +47,15 @@ public class CanalLauncher {
             }
 
             final CanalStater canalStater = new CanalStater(properties);
-            String managerAddress = properties.getProperty(CanalConstants.CANAL_ADMIN_MANAGER);
+            String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
             if (StringUtils.isNotEmpty(managerAddress)) {
-                String user = properties.getProperty(CanalConstants.CANAL_ADMIN_USER);
-                String passwd = properties.getProperty(CanalConstants.CANAL_ADMIN_PASSWD);
-                String adminPort = properties.getProperty(CanalConstants.CANAL_ADMIN_PORT, "11110");
-                String registerIp = properties.getProperty(CanalConstants.CANAL_REGISTER_IP);
+                String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
+                String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
+                String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
+                boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
+                    CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
+                String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
+                String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
                 if (StringUtils.isEmpty(registerIp)) {
                     registerIp = AddressUtils.getHostIp();
                 }
@@ -59,7 +63,9 @@ public class CanalLauncher {
                     user,
                     passwd,
                     registerIp,
-                    Integer.parseInt(adminPort));
+                    Integer.parseInt(adminPort),
+                    autoRegister,
+                    autoCluster);
                 PlainCanal canalConfig = configClient.findServer(null);
                 if (canalConfig == null) {
                     throw new IllegalArgumentException("managerAddress:" + managerAddress
@@ -69,8 +75,8 @@ public class CanalLauncher {
                 Properties managerProperties = canalConfig.getProperties();
                 // merge local
                 managerProperties.putAll(properties);
-                int scanIntervalInSecond = Integer.valueOf(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
-                    "5"));
+                int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(properties,
+                    CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
                 executor.scheduleWithFixedDelay(new Runnable() {
 
                     private PlainCanal lastCanalConfig;

+ 4 - 1
deployer/src/main/resources/canal_local.properties

@@ -5,4 +5,7 @@ canal.register.ip =
 canal.admin.manager = 127.0.0.1:8089
 canal.admin.port = 11110
 canal.admin.user = admin
-canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
+canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
+# admin auto register
+canal.admin.register.auto = true
+canal.admin.register.cluster =

+ 11 - 1
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/plain/PlainCanalConfigClient.java

@@ -33,6 +33,15 @@ public class PlainCanalConfigClient extends AbstractCanalLifeCycle implements Ca
     private HttpHelper           httpHelper;
     private String               localIp;
     private int                  adminPort;
+    private boolean              autoRegister;
+    private String               autoCluster;
+
+    public PlainCanalConfigClient(String configURL, String user, String passwd, String localIp, int adminPort,
+                                  boolean autoRegister, String autoCluster){
+        this(configURL, user, passwd, localIp, adminPort);
+        this.autoCluster = autoCluster;
+        this.autoRegister = autoRegister;
+    }
 
     public PlainCanalConfigClient(String configURL, String user, String passwd, String localIp, int adminPort){
         this.configURL = configURL;
@@ -61,7 +70,8 @@ public class PlainCanalConfigClient extends AbstractCanalLifeCycle implements Ca
         if (StringUtils.isEmpty(md5)) {
             md5 = "";
         }
-        String url = configURL + "/api/v1/config/server_polling?ip=" + localIp + "&port=" + adminPort + "&md5=" + md5;
+        String url = configURL + "/api/v1/config/server_polling?ip=" + localIp + "&port=" + adminPort + "&md5=" + md5
+                     + "&register=" + (autoRegister ? 1 : 0) + "&cluster=" + autoCluster;
         return queryConfig(url);
     }