Browse Source

jmx 客户端

mcy 5 years ago
parent
commit
7044e172f5
20 changed files with 663 additions and 79 deletions
  1. 30 0
      canal-admin/canal-admin-server/pom.xml
  2. 5 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/CanalAdminApplication.java
  3. 10 2
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/config/Config.java
  4. 34 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/config/EbeanConfig.java
  5. 32 32
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/config/WebConfig.java
  6. 45 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/NodeServerController.java
  7. 0 35
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/dao/CanalConfigDao.java
  8. 32 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/handler/CustomExceptionHandler.java
  9. 22 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/jmx/CanalServerMXBean.java
  10. 115 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/jmx/JMXConnection.java
  11. 21 1
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/CanalConfig.java
  12. 35 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/Model.java
  13. 94 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/NodeServer.java
  14. 20 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/NodeServerService.java
  15. 3 9
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalConfigServiceImpl.java
  16. 114 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/NodeServerServiceImpl.java
  17. 11 0
      canal-admin/canal-admin-server/src/test/java/com/alibaba/otter/canal/admin/BaseTest.java
  18. 7 0
      canal-admin/canal-admin-server/src/test/java/com/alibaba/otter/canal/admin/TestApplication.java
  19. 21 0
      canal-admin/canal-admin-server/src/test/java/com/alibaba/otter/canal/admin/service/NodeServerServiceTest.java
  20. 12 0
      canal-admin/canal-admin-server/src/test/resources/application.yml

+ 30 - 0
canal-admin/canal-admin-server/pom.xml

@@ -20,6 +20,10 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-jdbc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>mysql</groupId>
@@ -33,6 +37,21 @@
             <groupId>com.github.ben-manes.caffeine</groupId>
             <artifactId>caffeine</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>io.ebean</groupId>
+            <artifactId>ebean</artifactId>
+            <version>11.41.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-beanutils</groupId>
+            <artifactId>commons-beanutils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
@@ -87,6 +106,17 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>io.repaint.maven</groupId>
+                <artifactId>tiles-maven-plugin</artifactId>
+                <version>2.12</version>
+                <extensions>true</extensions>
+                <configuration>
+                    <tiles>
+                        <tile>io.ebean.tile:enhancement:11.41.1</tile>
+                    </tiles>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 

+ 5 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/CanalAdminApplication.java

@@ -18,4 +18,9 @@ public class CanalAdminApplication {
         application.setBannerMode(Banner.Mode.OFF);
         application.run(args);
     }
+
+    // public static void main(String[] args) throws Exception {
+    // UserAgent userAgent = new UserAgent();
+    //// HelloAgent helloAgent = new HelloAgent
+    // }
 }

+ 10 - 2
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/config/Config.java

@@ -1,15 +1,23 @@
 package com.alibaba.otter.canal.admin.config;
 
+import javax.sql.DataSource;
+
+import io.ebean.EbeanServer;
 import org.apache.commons.dbutils.QueryRunner;
+import org.springframework.beans.factory.FactoryBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import javax.sql.DataSource;
-
 @Configuration
 public class Config {
+
     @Bean
     public QueryRunner queryRunner(DataSource dataSource) {
         return new QueryRunner(dataSource);
     }
+
+//    @Bean("ebeanServer")
+//    public FactoryBean<EbeanServer> ebeanServer(DataSource dataSource) {
+//        return new EbeanServerFactory(dataSource);
+//    }
 }

+ 34 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/config/EbeanConfig.java

@@ -0,0 +1,34 @@
+package com.alibaba.otter.canal.admin.config;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import io.ebean.EbeanServer;
+import io.ebean.EbeanServerFactory;
+import io.ebean.config.ServerConfig;
+import io.ebean.config.UnderscoreNamingConvention;
+
+@Configuration
+public class EbeanConfig {
+
+    @Bean("ebeanServer")
+    public EbeanServer ebeanServer(DataSource dataSource) {
+        ServerConfig serverConfig = new ServerConfig();
+        serverConfig.setDefaultServer(true);
+        serverConfig.setNamingConvention(new UnderscoreNamingConvention());
+        List<String> packages = new ArrayList<>();
+        packages.add("com.alibaba.otter.canal.admin.model");
+        serverConfig.setPackages(packages);
+        serverConfig.setName("ebeanServer");
+        serverConfig.setDataSource(dataSource);
+        serverConfig.setDatabaseSequenceBatchSize(1);
+        serverConfig.setDdlGenerate(false);
+        serverConfig.setDdlRun(false);
+        return EbeanServerFactory.create(serverConfig);
+    }
+}

+ 32 - 32
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/config/WebConfig.java

@@ -43,37 +43,37 @@ public class WebConfig implements WebMvcConfigurer {
             }
         }).addPathPatterns("/api/**");
 
-        registry.addInterceptor(new HandlerInterceptor() {
-
-            @Override
-            public boolean preHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse,
-                                     Object o) throws Exception {
-                String token = httpServletRequest.getHeader("X-Token");
-                boolean valid = false;
-                if (token != null) {
-                    User user = UserController.loginUsers.getIfPresent(token);
-                    if (user != null) {
-                        valid = true;
-                    }
-                }
-                if (!valid) {
-                    BaseModel baseModel = BaseModel.getInstance(null);
-                    baseModel.setCode(50014);
-                    baseModel.setMessage("Expired token");
-                    ObjectMapper mapper = new ObjectMapper();
-                    String json = mapper.writeValueAsString(baseModel);
-                    try {
-                        httpServletResponse.setContentType("application/json;charset=UTF-8");
-                        PrintWriter out = httpServletResponse.getWriter();
-                        out.print(json);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                    return false;
-                }
-
-                return true;
-            }
-        }).addPathPatterns("/api/**").excludePathPatterns("/api/**/user/**");
+//        registry.addInterceptor(new HandlerInterceptor() {
+//
+//            @Override
+//            public boolean preHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse,
+//                                     Object o) throws Exception {
+//                String token = httpServletRequest.getHeader("X-Token");
+//                boolean valid = false;
+//                if (token != null) {
+//                    User user = UserController.loginUsers.getIfPresent(token);
+//                    if (user != null) {
+//                        valid = true;
+//                    }
+//                }
+//                if (!valid) {
+//                    BaseModel baseModel = BaseModel.getInstance(null);
+//                    baseModel.setCode(50014);
+//                    baseModel.setMessage("Expired token");
+//                    ObjectMapper mapper = new ObjectMapper();
+//                    String json = mapper.writeValueAsString(baseModel);
+//                    try {
+//                        httpServletResponse.setContentType("application/json;charset=UTF-8");
+//                        PrintWriter out = httpServletResponse.getWriter();
+//                        out.print(json);
+//                    } catch (Exception e) {
+//                        e.printStackTrace();
+//                    }
+//                    return false;
+//                }
+//
+//                return true;
+//            }
+//        }).addPathPatterns("/api/**").excludePathPatterns("/api/**/user/**");
     }
 }

+ 45 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/NodeServerController.java

@@ -0,0 +1,45 @@
+package com.alibaba.otter.canal.admin.controller;
+
+import java.util.List;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+import com.alibaba.otter.canal.admin.model.BaseModel;
+import com.alibaba.otter.canal.admin.model.NodeServer;
+import com.alibaba.otter.canal.admin.service.NodeServerService;
+
+@RestController
+@RequestMapping("/api/{env}")
+public class NodeServerController {
+
+    @Autowired
+    NodeServerService nodeServerService;
+
+    @GetMapping(value = "/nodeServers")
+    public BaseModel<List<NodeServer>> nodeServers(NodeServer nodeServer, @PathVariable String env) {
+        return BaseModel.getInstance(nodeServerService.findList(nodeServer));
+    }
+
+    @DeleteMapping(value = "/nodeServer/{id}")
+    public BaseModel<String> delete(@PathVariable Long id, @PathVariable String env) {
+        nodeServerService.delete(id);
+        return BaseModel.getInstance("success");
+    }
+
+    @PostMapping(value = "/nodeServer")
+    public BaseModel<String> save(@RequestBody NodeServer nodeServer, @PathVariable String env) {
+        nodeServerService.save(nodeServer);
+        return BaseModel.getInstance("success");
+    }
+
+    @GetMapping(value = "/nodeServer/{id}")
+    public BaseModel<NodeServer> detail(@PathVariable Long id, @PathVariable String env) {
+        return BaseModel.getInstance(nodeServerService.detail(id));
+    }
+
+    @GetMapping(value = "/nodeServer/status")
+    public BaseModel<Integer> status(@RequestParam String ip, @RequestParam Integer port, @PathVariable String env) {
+        return BaseModel.getInstance(nodeServerService.remoteNodeStatus(ip,port));
+    }
+}

+ 0 - 35
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/dao/CanalConfigDao.java

@@ -1,35 +0,0 @@
-package com.alibaba.otter.canal.admin.dao;
-
-import com.alibaba.otter.canal.admin.model.CanalConfig;
-import org.apache.commons.dbutils.QueryRunner;
-import org.apache.commons.dbutils.handlers.BeanHandler;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Repository;
-
-import java.sql.SQLException;
-import java.util.Date;
-
-@Repository
-public class CanalConfigDao {
-
-    @Autowired
-    private QueryRunner runner;
-
-    public CanalConfig findById(Long id) {
-        try {
-            String sql = "select id,name,content,modified_time as modifiedTime" + " from canal_config where id=?";
-            return runner.query(sql, new BeanHandler<>(CanalConfig.class), id);
-        } catch (SQLException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        }
-    }
-
-    public int updateContent(CanalConfig canalConfig){
-        try {
-            String sql = "update canal_config set content=?, modified_time=? where id=?";
-            return runner.update(sql, canalConfig.getContent(), new Date(), canalConfig.getId());
-        } catch (SQLException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        }
-    }
-}

+ 32 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/handler/CustomExceptionHandler.java

@@ -0,0 +1,32 @@
+package com.alibaba.otter.canal.admin.handler;
+
+import com.alibaba.otter.canal.admin.common.exception.ServiceException;
+import com.alibaba.otter.canal.admin.model.BaseModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.ControllerAdvice;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.ResponseStatus;
+
+@ControllerAdvice(annotations = ResponseBody.class)
+public class CustomExceptionHandler {
+
+    private static Logger logger = LoggerFactory.getLogger(CustomExceptionHandler.class);
+
+    @ResponseBody
+    @ResponseStatus(HttpStatus.OK)
+    @ExceptionHandler(value = Exception.class)
+    public BaseModel commonExceptionHandle(Exception e) {
+        if (e instanceof ServiceException) {
+            logger.error(e.getMessage());
+        } else {
+            logger.error(e.getMessage(), e);
+        }
+        BaseModel res = new BaseModel();
+        res.setCode(50000);
+        res.setMessage(e.getMessage());
+        return res;
+    }
+}

+ 22 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/jmx/CanalServerMXBean.java

@@ -0,0 +1,22 @@
+package com.alibaba.otter.canal.admin.jmx;
+
+public interface CanalServerMXBean {
+
+    int getStatus();
+
+    boolean start();
+
+    boolean stop();
+
+    boolean restart();
+
+    boolean exit();
+
+    boolean startInstance(String destination);
+
+    boolean stopInstance(String destination);
+
+    boolean reloadInstance(String destination);
+
+    String getRunningInstances();
+}

+ 115 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/jmx/JMXConnection.java

@@ -0,0 +1,115 @@
+package com.alibaba.otter.canal.admin.jmx;
+
+import com.alibaba.otter.canal.admin.common.exception.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.*;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.*;
+
+public class JMXConnection {
+
+    private Logger            logger = LoggerFactory.getLogger(JMXConnection.class);
+
+    private String            ip;
+    private Integer           port;
+    private JMXConnector      jmxc;
+    private CanalServerMXBean canalServerMXBean;
+
+    public JMXConnection(String ip, Integer port){
+        this.ip = ip;
+        this.port = port;
+    }
+
+    public void connect() {
+        try {
+            JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + ip + ":" + port + "/jmxrmi");
+            // jmxc = JMXConnectorFactory.connect(url, null);
+            jmxc = connectWithTimeout(url, 3, TimeUnit.SECONDS);
+            MBeanServerConnection mBeanServerConnection = jmxc.getMBeanServerConnection();
+            ObjectName name = new ObjectName("CanalServerAgent:type=CanalServerStatus");
+            mBeanServerConnection.addNotificationListener(name, (notification, handback) -> {
+            }, null, null);
+            canalServerMXBean = MBeanServerInvocationHandler
+                .newProxyInstance(mBeanServerConnection, name, CanalServerMXBean.class, false);
+        } catch (Exception e) {
+            throw new ServiceException(e.getMessage(), e);
+        }
+    }
+
+    private static JMXConnector connectWithTimeout(final JMXServiceURL url, long timeout,
+                                                  TimeUnit unit) throws IOException {
+        final BlockingQueue<Object> mailbox = new ArrayBlockingQueue<>(1);
+        ExecutorService executor = Executors.newSingleThreadExecutor(daemonThreadFactory);
+        executor.submit(() -> {
+            try {
+                JMXConnector connector = JMXConnectorFactory.connect(url);
+                if (!mailbox.offer(connector)) connector.close();
+            } catch (Throwable t) {
+                mailbox.offer(t);
+            }
+        });
+        Object result;
+        try {
+            result = mailbox.poll(timeout, unit);
+            if (result == null) {
+                if (!mailbox.offer("")) result = mailbox.take();
+            }
+        } catch (InterruptedException e) {
+            throw initCause(new InterruptedIOException(e.getMessage()), e);
+        } finally {
+            executor.shutdown();
+        }
+        if (result == null) throw new SocketTimeoutException("Connect timed out: " + url);
+        if (result instanceof JMXConnector) return (JMXConnector) result;
+        try {
+            throw (Throwable) result;
+        } catch (IOException | RuntimeException | Error e) {
+            throw e;
+        } catch (Throwable e) {
+            // In principle this can't happen but we wrap it anyway
+            throw new IOException(e.toString(), e);
+        }
+    }
+
+    private static <T extends Throwable> T initCause(T wrapper, Throwable wrapped) {
+        wrapper.initCause(wrapped);
+        return wrapper;
+    }
+
+    private static class DaemonThreadFactory implements ThreadFactory {
+
+        public Thread newThread(Runnable r) {
+            Thread t = Executors.defaultThreadFactory().newThread(r);
+            t.setDaemon(true);
+            return t;
+        }
+    }
+
+    private static final ThreadFactory daemonThreadFactory = new DaemonThreadFactory();
+
+    public CanalServerMXBean getCanalServerMXBean() {
+        if (jmxc == null) {
+            connect();
+        }
+        return canalServerMXBean;
+    }
+
+    public void close() {
+        try {
+            if (jmxc != null) {
+                jmxc.close();
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            jmxc = null;
+        }
+    }
+}

+ 21 - 1
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/CanalConfig.java

@@ -1,9 +1,29 @@
 package com.alibaba.otter.canal.admin.model;
 
+import io.ebean.Finder;
+
 import java.util.Date;
 
-public class CanalConfig {
+import javax.persistence.Entity;
+import javax.persistence.Id;
+
+@Entity
+public class CanalConfig extends Model {
+
+    public static final CanalConfigFinder find = new CanalConfigFinder();
+
+    public static class CanalConfigFinder extends Finder<Long, CanalConfig> {
+
+        /**
+         * Construct using the default EbeanServer.
+         */
+        public CanalConfigFinder(){
+            super(CanalConfig.class);
+        }
+
+    }
 
+    @Id
     private Long   id;
     private String name;
     private String content;

+ 35 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/Model.java

@@ -0,0 +1,35 @@
+package com.alibaba.otter.canal.admin.model;
+
+import io.ebean.Ebean;
+import io.ebean.EbeanServer;
+import org.apache.commons.beanutils.PropertyUtils;
+
+import javax.persistence.MappedSuperclass;
+import javax.persistence.OptimisticLockException;
+
+@MappedSuperclass
+public abstract class Model extends io.ebean.Model {
+
+    public void update(String... propertiesNames) {
+        try {
+            EbeanServer ebeanServer = Ebean.getDefaultServer();
+            Object id = ebeanServer.getBeanId(this);
+            Object model = ebeanServer.createQuery(this.getClass()).where().idEq(id).findOne();
+            for (String propertyName : propertiesNames) {
+                if (propertyName.startsWith("nn:")) { // not null
+                    propertyName = propertyName.substring(3);
+                    Object val = PropertyUtils.getProperty(this, propertyName);
+                    if (val != null) {
+                        PropertyUtils.setProperty(model, propertyName, val);
+                    }
+                } else {
+                    Object val = PropertyUtils.getProperty(this, propertyName);
+                    PropertyUtils.setProperty(model, propertyName, val);
+                }
+            }
+            ebeanServer.update(model);
+        } catch (Exception e) {
+            throw new OptimisticLockException(e);
+        }
+    }
+}

+ 94 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/NodeServer.java

@@ -0,0 +1,94 @@
+package com.alibaba.otter.canal.admin.model;
+
+import java.util.Date;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+
+import io.ebean.Finder;
+
+@Entity
+public class NodeServer extends Model {
+
+    public static final NodeServerFinder find = new NodeServerFinder();
+
+    public static class NodeServerFinder extends Finder<Long, NodeServer> {
+
+        /**
+         * Construct using the default EbeanServer.
+         */
+        public NodeServerFinder(){
+            super(NodeServer.class);
+        }
+
+    }
+
+    @Id
+    private Long    id;
+    private String  name;
+    private String  ip;
+    private Integer port;
+    private Integer port2;
+    private Integer status = -1;
+    private Date    modifiedTime;
+
+    public void init() {
+
+    }
+
+    public Long getId() {
+        return id;
+    }
+
+    public void setId(Long id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getIp() {
+        return ip;
+    }
+
+    public void setIp(String ip) {
+        this.ip = ip;
+    }
+
+    public Integer getPort() {
+        return port;
+    }
+
+    public void setPort(Integer port) {
+        this.port = port;
+    }
+
+    public Integer getPort2() {
+        return port2;
+    }
+
+    public void setPort2(Integer port2) {
+        this.port2 = port2;
+    }
+
+    public Integer getStatus() {
+        return status;
+    }
+
+    public void setStatus(Integer status) {
+        this.status = status;
+    }
+
+    public Date getModifiedTime() {
+        return modifiedTime;
+    }
+
+    public void setModifiedTime(Date modifiedTime) {
+        this.modifiedTime = modifiedTime;
+    }
+}

+ 20 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/NodeServerService.java

@@ -0,0 +1,20 @@
+package com.alibaba.otter.canal.admin.service;
+
+import com.alibaba.otter.canal.admin.model.NodeServer;
+
+import java.util.List;
+
+public interface NodeServerService {
+
+    void save(NodeServer nodeServer);
+
+    NodeServer detail(Long id);
+
+    void update(NodeServer nodeServer);
+
+    void delete(Long id);
+
+    List<NodeServer> findList(NodeServer nodeServer);
+
+    int remoteNodeStatus(String ip, Integer port);
+}

+ 3 - 9
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalConfigServiceImpl.java

@@ -1,28 +1,22 @@
 package com.alibaba.otter.canal.admin.service.impl;
 
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import com.alibaba.otter.canal.admin.dao.CanalConfigDao;
 import com.alibaba.otter.canal.admin.model.CanalConfig;
 import com.alibaba.otter.canal.admin.service.CanalConfigService;
-import org.springframework.transaction.annotation.Transactional;
 
 @Service
 public class CanalConfigServiceImpl implements CanalConfigService {
 
-    @Autowired
-    CanalConfigDao canalConfigDao;
-
     public CanalConfig getCanalConfig() {
-        return canalConfigDao.findById(1L);
+        return CanalConfig.find.byId(1L);
     }
 
     public CanalConfig getAdapterConfig() {
-        return canalConfigDao.findById(2L);
+        return CanalConfig.find.byId(2L);
     }
 
     public void updateContent(CanalConfig canalConfig) {
-        canalConfigDao.updateContent(canalConfig);
+        canalConfig.update("content");
     }
 }

+ 114 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/NodeServerServiceImpl.java

@@ -0,0 +1,114 @@
+package com.alibaba.otter.canal.admin.service.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import com.alibaba.otter.canal.admin.common.exception.ServiceException;
+import com.alibaba.otter.canal.admin.jmx.CanalServerMXBean;
+import com.alibaba.otter.canal.admin.jmx.JMXConnection;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import com.alibaba.otter.canal.admin.model.NodeServer;
+import com.alibaba.otter.canal.admin.service.NodeServerService;
+
+import io.ebean.Query;
+
+@Service
+public class NodeServerServiceImpl implements NodeServerService {
+
+    private static final Logger logger = LoggerFactory.getLogger(NodeServerServiceImpl.class);
+
+    public void save(NodeServer nodeServer) {
+        int cnt = NodeServer.find.query()
+            .where()
+            .eq("ip", nodeServer.getIp())
+            .eq("port", nodeServer.getPort())
+            .findCount();
+        if (cnt > 0) {
+            throw new ServiceException("节点信息已存在");
+        }
+
+        nodeServer.save();
+
+        // 检测节点状态
+    }
+
+    public NodeServer detail(Long id) {
+        return NodeServer.find.byId(id);
+    }
+
+    public void update(NodeServer nodeServer) {
+        nodeServer.update("name", "ip", "port", "port2");
+
+        // 检测节点状态
+    }
+
+    public void delete(Long id) {
+        NodeServer nodeServer = NodeServer.find.byId(id);
+        if (nodeServer != null) {
+            nodeServer.delete();
+        }
+    }
+
+    public List<NodeServer> findList(NodeServer nodeServer) {
+        Query<NodeServer> query = NodeServer.find.query();
+        if (nodeServer != null) {
+            if (StringUtils.isNotEmpty(nodeServer.getName())) {
+                query.where().like("name", "%" + nodeServer.getName() + "%");
+            }
+            if (StringUtils.isNotEmpty(nodeServer.getIp())) {
+                query.where().eq("ip", nodeServer.getIp());
+            }
+        }
+        query.order().asc("id");
+        List<NodeServer> nodeServers = query.findList();
+
+        ExecutorService executorService = Executors.newFixedThreadPool(nodeServers.size());
+        List<Future<Boolean>> futures = new ArrayList<>(nodeServers.size());
+        // 取每个节点的状态
+        for (NodeServer ns : nodeServers) {
+            futures.add(executorService.submit(() -> {
+                int status = -1;
+                JMXConnection jmxConnection = new JMXConnection(ns.getIp(), ns.getPort());
+                try {
+                    CanalServerMXBean canalServerMXBean = jmxConnection.getCanalServerMXBean();
+                    status = canalServerMXBean.getStatus();
+                } catch (Exception e) {
+                    logger.warn(e.getMessage());
+                } finally {
+                    jmxConnection.close();
+                }
+                ns.setStatus(status);
+                return status != -1;
+            }));
+        }
+        futures.forEach(f -> {
+            try {
+                f.get();
+            } catch (InterruptedException | ExecutionException e) {
+                // ignore
+            }
+        });
+
+        executorService.shutdownNow();
+
+        return nodeServers;
+    }
+
+    public int remoteNodeStatus(String ip, Integer port) {
+        JMXConnection jmxConnection = new JMXConnection(ip, port);
+        try {
+            CanalServerMXBean canalServerMXBean = jmxConnection.getCanalServerMXBean();
+            return canalServerMXBean.getStatus();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            jmxConnection.close();
+        }
+        return -1;
+    }
+}

+ 11 - 0
canal-admin/canal-admin-server/src/test/java/com/alibaba/otter/canal/admin/BaseTest.java

@@ -0,0 +1,11 @@
+package com.alibaba.otter.canal.admin;
+
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = { CanalAdminApplication.class })
+
+public class BaseTest {
+}

+ 7 - 0
canal-admin/canal-admin-server/src/test/java/com/alibaba/otter/canal/admin/TestApplication.java

@@ -0,0 +1,7 @@
+package com.alibaba.otter.canal.admin;
+
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class TestApplication {
+}

+ 21 - 0
canal-admin/canal-admin-server/src/test/java/com/alibaba/otter/canal/admin/service/NodeServerServiceTest.java

@@ -0,0 +1,21 @@
+package com.alibaba.otter.canal.admin.service;
+
+import com.alibaba.otter.canal.admin.BaseTest;
+import com.alibaba.otter.canal.admin.model.NodeServer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.List;
+
+public class NodeServerServiceTest extends BaseTest {
+
+    @Autowired
+    NodeServerService nodeServerService;
+
+    @Test
+    public void findList() {
+        List<NodeServer> list = nodeServerService.findList(null);
+        Assert.assertNotNull(list);
+    }
+}

+ 12 - 0
canal-admin/canal-admin-server/src/test/resources/application.yml

@@ -0,0 +1,12 @@
+spring.datasource:
+    url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8&useSSL=false
+    username: root
+    password: 121212
+    driver-class-name: com.mysql.jdbc.Driver
+    hikari:
+      maximum-pool-size: 10
+      minimum-idle: 1
+
+logging:
+  level:
+    com.alibaba.otter.canal.admin: DEBUG