rewerma 5 年之前
父节点
当前提交
28da536ec8

+ 9 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/NodeServerController.java

@@ -49,4 +49,13 @@ public class NodeServerController {
         return BaseModel.getInstance(nodeServerService.remoteNodeStatus(ip, port));
     }
 
+    @PutMapping(value = "/nodeServer/start/{id}")
+    public BaseModel<Boolean> start(@PathVariable Long id, @PathVariable String env) {
+        return BaseModel.getInstance(nodeServerService.remoteOperation(id, "start"));
+    }
+
+    @PutMapping(value = "/nodeServer/stop/{id}")
+    public BaseModel<Boolean> stop(@PathVariable Long id, @PathVariable String env) {
+        return BaseModel.getInstance(nodeServerService.remoteOperation(id, "stop"));
+    }
 }

+ 20 - 6
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/jmx/JMXConnection.java

@@ -12,21 +12,35 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.SocketTimeoutException;
 import java.util.concurrent.*;
+import java.util.function.Function;
 
 public class JMXConnection {
 
-    private Logger            logger = LoggerFactory.getLogger(JMXConnection.class);
+    private static final Logger logger = LoggerFactory.getLogger(JMXConnection.class);
 
-    private String            ip;
-    private Integer           port;
-    private JMXConnector      jmxc;
-    private CanalServerMXBean canalServerMXBean;
+    private String              ip;
+    private Integer             port;
+    private JMXConnector        jmxc;
+    private CanalServerMXBean   canalServerMXBean;
 
     public JMXConnection(String ip, Integer port){
         this.ip = ip;
         this.port = port;
     }
 
+    public static <R> R execute(String ip, int port, Function<CanalServerMXBean, R> function) {
+        JMXConnection jmxConnection = new JMXConnection(ip, port);
+        try {
+            CanalServerMXBean canalServerMXBean = jmxConnection.getCanalServerMXBean();
+            return function.apply(canalServerMXBean);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            jmxConnection.close();
+        }
+        return null;
+    }
+
     public void connect() {
         try {
             JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + ip + ":" + port + "/jmxrmi");
@@ -44,7 +58,7 @@ public class JMXConnection {
     }
 
     private static JMXConnector connectWithTimeout(final JMXServiceURL url, long timeout,
-                                                  TimeUnit unit) throws IOException {
+                                                   TimeUnit unit) throws IOException {
         final BlockingQueue<Object> mailbox = new ArrayBlockingQueue<>(1);
         ExecutorService executor = Executors.newSingleThreadExecutor(daemonThreadFactory);
         executor.submit(() -> {

+ 41 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/Model.java

@@ -4,12 +4,53 @@ import io.ebean.Ebean;
 import io.ebean.EbeanServer;
 import org.apache.commons.beanutils.PropertyUtils;
 
+import javax.persistence.Id;
 import javax.persistence.MappedSuperclass;
 import javax.persistence.OptimisticLockException;
+import java.lang.reflect.Field;
 
 @MappedSuperclass
 public abstract class Model extends io.ebean.Model {
 
+    public void init() {
+    }
+
+    public void save() {
+        init();
+        super.save();
+    }
+
+    public void insert() {
+        init();
+        super.insert();
+    }
+
+    public void saveOrUpdate() {
+        try {
+            Field idField = null;
+            // find id field
+            Field[] fields = this.getClass().getDeclaredFields();
+            for (Field field : fields) {
+                Id idAnn = field.getAnnotation(Id.class);
+                if (idAnn != null) {
+                    idField = field;
+                    break;
+                }
+            }
+            if (idField == null) {
+                return;
+            }
+            Object idVal = PropertyUtils.getProperty(this, idField.getName());
+            if (idVal == null) {
+                this.save();
+            } else {
+                this.update();
+            }
+        } catch (Exception e) {
+            throw new OptimisticLockException(e);
+        }
+    }
+
     public void update(String... propertiesNames) {
         try {
             EbeanServer ebeanServer = Ebean.getDefaultServer();

+ 2 - 2
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/NodeServer.java

@@ -29,11 +29,11 @@ public class NodeServer extends Model {
     private String  ip;
     private Integer port;
     private Integer port2;
-    private Integer status = -1;
+    private Integer status;
     private Date    modifiedTime;
 
     public void init() {
-
+        status = -1;
     }
 
     public Long getId() {

+ 2 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/NodeServerService.java

@@ -17,4 +17,6 @@ public interface NodeServerService {
     List<NodeServer> findList(NodeServer nodeServer);
 
     int remoteNodeStatus(String ip, Integer port);
+
+    boolean remoteOperation(Long id, String option);
 }

+ 25 - 9
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/NodeServerServiceImpl.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.admin.service.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
+import java.util.function.Function;
 
 import com.alibaba.otter.canal.admin.common.exception.ServiceException;
 import com.alibaba.otter.canal.admin.jmx.CanalServerMXBean;
@@ -99,15 +100,30 @@ public class NodeServerServiceImpl implements NodeServerService {
     }
 
     public int remoteNodeStatus(String ip, Integer port) {
-        JMXConnection jmxConnection = new JMXConnection(ip, port);
-        try {
-            CanalServerMXBean canalServerMXBean = jmxConnection.getCanalServerMXBean();
-            return canalServerMXBean.getStatus();
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        } finally {
-            jmxConnection.close();
+        Integer resutl = JMXConnection.execute(ip, port, CanalServerMXBean::getStatus);
+        if (resutl == null) {
+            resutl = -1;
         }
-        return -1;
+        return resutl;
+    }
+
+    public boolean remoteOperation(Long id, String option) {
+        NodeServer nodeServer = NodeServer.find.byId(id);
+        if (nodeServer == null) {
+            return false;
+        }
+        Boolean resutl = null;
+        if ("start".equals(option)) {
+            resutl = JMXConnection.execute(nodeServer.getIp(), nodeServer.getPort(), CanalServerMXBean::start);
+        } else if ("stop".equals(option)) {
+            JMXConnection.execute(nodeServer.getIp(), nodeServer.getPort(), CanalServerMXBean::stop);
+        } else {
+            return false;
+        }
+
+        if (resutl == null) {
+            resutl = false;
+        }
+        return resutl;
     }
 }

+ 14 - 0
canal-admin/canal-admin-ui/src/api/nodeServer.js

@@ -37,3 +37,17 @@ export function deleteNodeServer(id) {
     method: 'delete'
   })
 }
+
+export function startNodeServer(id) {
+  return request({
+    url: '/nodeServer/start/' + id,
+    method: 'put'
+  })
+}
+
+export function stopNodeServer(id) {
+  return request({
+    url: '/nodeServer/stop/' + id,
+    method: 'put'
+  })
+}

+ 55 - 3
canal-admin/canal-admin-ui/src/views/canalServer/nodeServer.vue

@@ -49,8 +49,8 @@
             <el-dropdown-menu slot="dropdown">
               <el-dropdown-item @click.native="handleUpdate(scope.row)">修改节点</el-dropdown-item>
               <el-dropdown-item @click.native="handleDelete(scope.row)">删除节点</el-dropdown-item>
-              <el-dropdown-item @click.native="handleResume(scope.row)">启动服务</el-dropdown-item>
-              <el-dropdown-item @click.native="handleDelete(scope.row)">停止服务</el-dropdown-item>
+              <el-dropdown-item @click.native="handleStart(scope.row)">启动服务</el-dropdown-item>
+              <el-dropdown-item @click.native="handleStop(scope.row)">停止服务</el-dropdown-item>
             </el-dropdown-menu>
           </el-dropdown>
         </template>
@@ -100,7 +100,7 @@
 </template>
 
 <script>
-import { addNodeServer, getNodeServers, updateNodeServer, deleteNodeServer } from '@/api/nodeServer'
+import { addNodeServer, getNodeServers, updateNodeServer, deleteNodeServer, startNodeServer } from '@/api/nodeServer'
 
 export default {
   filters: {
@@ -238,6 +238,58 @@ export default {
           }
         })
       })
+    },
+    handleStart(row) {
+      if (row.status !== 0) {
+        this.$message({ message: '当前节点不是停止状态,无法启动', type: 'error' })
+        return
+      }
+      this.$confirm('启动节点 Canal Server 服务', '确定启动节点服务', {
+        confirmButtonText: '确定',
+        cancelButtonText: '取消',
+        type: 'warning'
+      }).then(() => {
+        startNodeServer(row.id).then((res) => {
+          if (res.data) {
+            this.fetchData()
+            this.$message({
+              message: '启动成功',
+              type: 'success'
+            })
+          } else {
+            this.$message({
+              message: '启动节点服务出现异常',
+              type: 'error'
+            })
+          }
+        })
+      })
+    },
+    handleStop(row) {
+      if (row.status !== 1) {
+        this.$message({ message: '当前节点不是启动状态,无法停止', type: 'error' })
+        return
+      }
+      this.$confirm('停止节点 Canal Server 服务', '确定停止节点服务', {
+        confirmButtonText: '确定',
+        cancelButtonText: '取消',
+        type: 'warning'
+      }).then(() => {
+        startNodeServer(row.id).then((res) => {
+          if (res.data) {
+            this.fetchData()
+            this.$message({
+              message: '停止成功',
+              type: 'success'
+            })
+          } else {
+            this.$message({
+              message: '停止节点服务出现异常',
+              type: 'error'
+            })
+          }
+        })
+      })
     }
   }
 }