rewerma před 6 roky
rodič
revize
5741f5a347

+ 8 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -556,4 +556,12 @@ public class CanalController {
     public void setCanalMQStarter(CanalMQStarter canalMQStarter) {
         this.canalMQStarter = canalMQStarter;
     }
+
+    public Map<InstanceMode, InstanceConfigMonitor> getInstanceConfigMonitors() {
+        return instanceConfigMonitors;
+    }
+
+    public Map<String, InstanceConfig> getInstanceConfigs() {
+        return instanceConfigs;
+    }
 }

+ 4 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -51,6 +51,10 @@ public class CanalStater {
         this.properties = properties;
     }
 
+    public CanalController getController() {
+        return controller;
+    }
+
     /**
      * 启动方法
      *

+ 69 - 4
deployer/src/main/java/com/alibaba/otter/canal/deployer/mbean/CanalServerBean.java

@@ -2,9 +2,16 @@ package com.alibaba.otter.canal.deployer.mbean;
 
 import com.alibaba.otter.canal.deployer.CanalLauncher;
 import com.alibaba.otter.canal.deployer.CanalStater;
+import com.alibaba.otter.canal.deployer.InstanceConfig;
+import com.alibaba.otter.canal.deployer.monitor.InstanceAction;
+import com.alibaba.otter.canal.deployer.monitor.InstanceConfigMonitor;
+import com.alibaba.otter.canal.deployer.monitor.SpringInstanceConfigMonitor;
+import com.google.common.base.Joiner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 public class CanalServerBean implements CanalServerMXBean {
 
     private static final Logger logger = LoggerFactory.getLogger(CanalServerBean.class);
@@ -23,10 +30,6 @@ public class CanalServerBean implements CanalServerMXBean {
         return status;
     }
 
-    public void setStatus(int status) {
-        this.status = status;
-    }
-
     @Override
     public synchronized boolean start() {
         try {
@@ -68,4 +71,66 @@ public class CanalServerBean implements CanalServerMXBean {
         return true;
     }
 
+    @Override
+    public synchronized boolean startInstance(String destination) {
+        try {
+            InstanceAction instanceAction = getInstanceAction(destination);
+            if (instanceAction != null) {
+                instanceAction.start(destination);
+
+            }
+        } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+        }
+        return false;
+    }
+
+    @Override
+    public synchronized boolean stopInstance(String destination) {
+        try {
+            InstanceAction instanceAction = getInstanceAction(destination);
+            if (instanceAction != null) {
+                instanceAction.stop(destination);
+            }
+        } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+        }
+        return false;
+    }
+
+    @Override
+    public synchronized boolean reloadInstance(String destination) {
+        try {
+            InstanceAction instanceAction = getInstanceAction(destination);
+            if (instanceAction != null) {
+                instanceAction.reload(destination);
+            }
+        } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+        }
+        return false;
+    }
+
+    @Override
+    public synchronized String getRunningInstances() {
+        try {
+            Map<String, InstanceConfig> instanceConfigs = canalStater.getController().getInstanceConfigs();
+            if (instanceConfigs != null) {
+                return Joiner.on(",").join(instanceConfigs.keySet());
+            }
+        } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+        }
+        return "";
+    }
+
+    private InstanceAction getInstanceAction(String destination) {
+        Map<InstanceConfig.InstanceMode, InstanceConfigMonitor> monitors = canalStater.getController()
+            .getInstanceConfigMonitors();
+        SpringInstanceConfigMonitor monitor = (SpringInstanceConfigMonitor) monitors
+            .get(InstanceConfig.InstanceMode.SPRING);
+        Map<String, InstanceAction> instanceActions = monitor.getActions();
+        return instanceActions.get(destination);
+    }
+
 }

+ 8 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/mbean/CanalServerMXBean.java

@@ -11,4 +11,12 @@ public interface CanalServerMXBean {
     boolean restart();
 
     boolean exit();
+
+    boolean startInstance(String destination);
+
+    boolean stopInstance(String destination);
+
+    boolean reloadInstance(String destination);
+
+    String getRunningInstances();
 }

+ 4 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/SpringInstanceConfigMonitor.java

@@ -50,6 +50,10 @@ public class SpringInstanceConfigMonitor extends AbstractCanalLifeCycle implemen
 
     private volatile boolean                 isFirst              = true;
 
+    public Map<String, InstanceAction> getActions() {
+        return actions;
+    }
+
     public void start() {
         super.start();
         Assert.notNull(rootConf, "root conf dir is null!");