1
0
rewerma 5 жил өмнө
parent
commit
40d2a8c6cc

+ 47 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/CanalInstanceController.java

@@ -0,0 +1,47 @@
+package com.alibaba.otter.canal.admin.controller;
+
+import java.util.List;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+import com.alibaba.otter.canal.admin.model.BaseModel;
+import com.alibaba.otter.canal.admin.model.CanalInstanceConfig;
+import com.alibaba.otter.canal.admin.service.CanalInstanceService;
+
+@RestController
+@RequestMapping("/api/{env}/canal")
+public class CanalInstanceController {
+
+    @Autowired
+    CanalInstanceService canalInstanceConfigService;
+
+    @GetMapping(value = "/canalInstances")
+    public BaseModel<List<CanalInstanceConfig>> nodeServers(CanalInstanceConfig canalInstanceConfig,
+                                                            @PathVariable String env) {
+        return BaseModel.getInstance(canalInstanceConfigService.findList(canalInstanceConfig));
+    }
+
+    @PostMapping(value = "/canalInstance")
+    public BaseModel<String> save(@RequestBody CanalInstanceConfig canalInstanceConfig, @PathVariable String env) {
+        canalInstanceConfigService.save(canalInstanceConfig);
+        return BaseModel.getInstance("success");
+    }
+
+    @GetMapping(value = "/canalInstance/{id}")
+    public BaseModel<CanalInstanceConfig> detail(@PathVariable Long id, @PathVariable String env) {
+        return BaseModel.getInstance(canalInstanceConfigService.detail(id));
+    }
+
+    @PutMapping(value = "/canalInstance")
+    public BaseModel<String> update(@RequestBody  CanalInstanceConfig canalInstanceConfig, @PathVariable String env) {
+        canalInstanceConfigService.updateContent(canalInstanceConfig);
+        return BaseModel.getInstance("success");
+    }
+
+    @DeleteMapping(value = "/nodeServer/{id}")
+    public BaseModel<String> delete(@PathVariable Long id, @PathVariable String env) {
+        canalInstanceConfigService.delete(id);
+        return BaseModel.getInstance("success");
+    }
+}

+ 84 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/CanalInstanceConfig.java

@@ -0,0 +1,84 @@
+package com.alibaba.otter.canal.admin.model;
+
+import io.ebean.Finder;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Transient;
+import java.util.Date;
+
+@Entity
+public class CanalInstanceConfig extends Model {
+
+    public static final CanalInstanceConfigFinder find = new CanalInstanceConfigFinder();
+
+    public static class CanalInstanceConfigFinder extends Finder<Long, CanalInstanceConfig> {
+
+        /**
+         * Construct using the default EbeanServer.
+         */
+        public CanalInstanceConfigFinder(){
+            super(CanalInstanceConfig.class);
+        }
+
+    }
+
+    @Id
+    private Long       id;
+    private String     name;
+    private String     content;
+    private Date       modifiedTime;
+
+    @Transient
+    private NodeServer nodeServer;
+    @Transient
+    private Integer    status = 0;
+
+    public Long getId() {
+        return id;
+    }
+
+    public void setId(Long id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    public void setContent(String content) {
+        this.content = content;
+    }
+
+    public Date getModifiedTime() {
+        return modifiedTime;
+    }
+
+    public void setModifiedTime(Date modifiedTime) {
+        this.modifiedTime = modifiedTime;
+    }
+
+    public NodeServer getNodeServer() {
+        return nodeServer;
+    }
+
+    public void setNodeServer(NodeServer nodeServer) {
+        this.nodeServer = nodeServer;
+    }
+
+    public Integer getStatus() {
+        return status;
+    }
+
+    public void setStatus(Integer status) {
+        this.status = status;
+    }
+}

+ 18 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/CanalInstanceService.java

@@ -0,0 +1,18 @@
+package com.alibaba.otter.canal.admin.service;
+
+import com.alibaba.otter.canal.admin.model.CanalInstanceConfig;
+
+import java.util.List;
+
+public interface CanalInstanceService {
+
+    List<CanalInstanceConfig> findList(CanalInstanceConfig canalInstanceConfig);
+
+    void save(CanalInstanceConfig canalInstanceConfig);
+
+    CanalInstanceConfig detail(Long id);
+
+    void updateContent(CanalInstanceConfig canalInstanceConfig);
+
+    void delete(Long id);
+}

+ 72 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalInstanceServiceImpl.java

@@ -0,0 +1,72 @@
+package com.alibaba.otter.canal.admin.service.impl;
+
+import java.util.List;
+
+import com.alibaba.otter.canal.admin.service.CanalInstanceService;
+import org.apache.commons.lang.StringUtils;
+
+import com.alibaba.otter.canal.admin.jmx.CanalServerMXBean;
+import com.alibaba.otter.canal.admin.jmx.JMXConnection;
+import com.alibaba.otter.canal.admin.model.CanalInstanceConfig;
+import com.alibaba.otter.canal.admin.model.NodeServer;
+
+import io.ebean.Query;
+import org.springframework.stereotype.Service;
+
+@Service
+public class CanalInstanceServiceImpl implements CanalInstanceService {
+
+    public List<CanalInstanceConfig> findList(CanalInstanceConfig canalInstanceConfig) {
+        Query<CanalInstanceConfig> query = CanalInstanceConfig.find.query()
+            .setDisableLazyLoading(true)
+            .select("name, modifiedTime");
+        if (canalInstanceConfig != null) {
+            if (StringUtils.isNotEmpty(canalInstanceConfig.getName())) {
+                query.where().like("name", "%" + canalInstanceConfig.getName() + "%");
+            }
+        }
+        List<CanalInstanceConfig> canalInstanceConfigs = query.findList();
+
+        // check all canal instances running status
+        List<NodeServer> nodeServers = NodeServer.find.query().findList();
+        for (NodeServer nodeServer : nodeServers) {
+            String runningInstances = JMXConnection
+                .execute(nodeServer.getIp(), nodeServer.getPort(), CanalServerMXBean::getRunningInstances);
+            if (runningInstances == null) {
+                continue;
+            }
+            String[] instances = runningInstances.split(",");
+            for (String instance : instances) {
+                for (CanalInstanceConfig cig : canalInstanceConfigs) {
+                    if (instance.equals(cig.getName())) {
+                        cig.setNodeServer(nodeServer);
+                        cig.setStatus(1);
+                        break;
+                    }
+                }
+            }
+        }
+
+        return canalInstanceConfigs;
+    }
+
+    public void save(CanalInstanceConfig canalInstanceConfig) {
+        canalInstanceConfig.insert();
+    }
+
+    public CanalInstanceConfig detail(Long id) {
+        return CanalInstanceConfig.find.byId(id);
+    }
+
+    public void updateContent(CanalInstanceConfig canalInstanceConfig) {
+        canalInstanceConfig.update("content");
+    }
+
+    public void delete(Long id) {
+        CanalInstanceConfig canalInstanceConfig = CanalInstanceConfig.find.byId(id);
+        if (canalInstanceConfig != null) {
+            canalInstanceConfig.delete();
+        }
+    }
+
+}