mcy 6 роки тому
батько
коміт
23209f24ed

+ 46 - 45
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -4,62 +4,63 @@ import java.text.MessageFormat;
 
 /**
  * 启动常用变量
- * 
+ *
  * @author jianghang 2012-11-8 下午03:15:55
  * @version 1.0.0
  */
 public class CanalConstants {
 
-    public static final String MDC_DESTINATION                       = "destination";
-    public static final String ROOT                                  = "canal";
-    public static final String CANAL_ID                              = ROOT + "." + "id";
-    public static final String CANAL_IP                              = ROOT + "." + "ip";
-    public static final String CANAL_PORT                            = ROOT + "." + "port";
-    public static final String CANAL_METRICS_PULL_PORT               = ROOT + "." + "metrics.pull.port";
-    public static final String CANAL_ZKSERVERS                       = ROOT + "." + "zkServers";
-    public static final String CANAL_WITHOUT_NETTY                   = ROOT + "." + "withoutNetty";
+    public static final String MDC_DESTINATION                      = "destination";
+    public static final String ROOT                                 = "canal";
+    public static final String CANAL_ID                             = ROOT + "." + "id";
+    public static final String CANAL_IP                             = ROOT + "." + "ip";
+    public static final String CANAL_PORT                           = ROOT + "." + "port";
+    public static final String CANAL_METRICS_PULL_PORT              = ROOT + "." + "metrics.pull.port";
+    public static final String CANAL_ADMIN_JMX_PORT                 = ROOT + "." + "admin.jmx.port";
+    public static final String CANAL_ZKSERVERS                      = ROOT + "." + "zkServers";
+    public static final String CANAL_WITHOUT_NETTY                  = ROOT + "." + "withoutNetty";
 
-    public static final String CANAL_DESTINATIONS                    = ROOT + "." + "destinations";
-    public static final String CANAL_AUTO_SCAN                       = ROOT + "." + "auto.scan";
-    public static final String CANAL_AUTO_SCAN_INTERVAL              = ROOT + "." + "auto.scan.interval";
-    public static final String CANAL_CONF_DIR                        = ROOT + "." + "conf.dir";
-    public static final String CANAL_SERVER_MODE                     = ROOT + "." + "serverMode";
+    public static final String CANAL_DESTINATIONS                   = ROOT + "." + "destinations";
+    public static final String CANAL_AUTO_SCAN                      = ROOT + "." + "auto.scan";
+    public static final String CANAL_AUTO_SCAN_INTERVAL             = ROOT + "." + "auto.scan.interval";
+    public static final String CANAL_CONF_DIR                       = ROOT + "." + "conf.dir";
+    public static final String CANAL_SERVER_MODE                    = ROOT + "." + "serverMode";
 
-    public static final String CANAL_DESTINATION_SPLIT               = ",";
-    public static final String GLOBAL_NAME                           = "global";
+    public static final String CANAL_DESTINATION_SPLIT              = ",";
+    public static final String GLOBAL_NAME                          = "global";
 
-    public static final String INSTANCE_MODE_TEMPLATE                = ROOT + "." + "instance.{0}.mode";
-    public static final String INSTANCE_LAZY_TEMPLATE                = ROOT + "." + "instance.{0}.lazy";
-    public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE     = ROOT + "." + "instance.{0}.manager.address";
-    public static final String INSTANCE_SPRING_XML_TEMPLATE          = ROOT + "." + "instance.{0}.spring.xml";
+    public static final String INSTANCE_MODE_TEMPLATE               = ROOT + "." + "instance.{0}.mode";
+    public static final String INSTANCE_LAZY_TEMPLATE               = ROOT + "." + "instance.{0}.lazy";
+    public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE    = ROOT + "." + "instance.{0}.manager.address";
+    public static final String INSTANCE_SPRING_XML_TEMPLATE         = ROOT + "." + "instance.{0}.spring.xml";
 
-    public static final String CANAL_DESTINATION_PROPERTY            = ROOT + ".instance.destination";
+    public static final String CANAL_DESTINATION_PROPERTY           = ROOT + ".instance.destination";
 
-    public static final String CANAL_SOCKETCHANNEL                   = ROOT + "." + "socketChannel";
+    public static final String CANAL_SOCKETCHANNEL                  = ROOT + "." + "socketChannel";
 
-    public static final String CANAL_MQ_SERVERS                      = ROOT + "." + "mq.servers";
-    public static final String CANAL_MQ_RETRIES                      = ROOT + "." + "mq.retries";
-    public static final String CANAL_MQ_BATCHSIZE                    = ROOT + "." + "mq.batchSize";
-    public static final String CANAL_MQ_LINGERMS                     = ROOT + "." + "mq.lingerMs";
-    public static final String CANAL_MQ_MAXREQUESTSIZE               = ROOT + "." + "mq.maxRequestSize";
-    public static final String CANAL_MQ_BUFFERMEMORY                 = ROOT + "." + "mq.bufferMemory";
-    public static final String CANAL_MQ_CANALBATCHSIZE               = ROOT + "." + "mq.canalBatchSize";
-    public static final String CANAL_MQ_CANALGETTIMEOUT              = ROOT + "." + "mq.canalGetTimeout";
-    public static final String CANAL_MQ_FLATMESSAGE                  = ROOT + "." + "mq.flatMessage";
-    public static final String CANAL_MQ_COMPRESSION_TYPE             = ROOT + "." + "mq.compressionType";
-    public static final String CANAL_MQ_ACKS                         = ROOT + "." + "mq.acks";
-    public static final String CANAL_MQ_TRANSACTION                  = ROOT + "." + "mq.transaction";
-    public static final String CANAL_MQ_PRODUCERGROUP                = ROOT + "." + "mq.producerGroup";
-    public static final String CANAL_ALIYUN_ACCESSKEY                = ROOT + "." + "aliyun.accessKey";
-    public static final String CANAL_ALIYUN_SECRETKEY                = ROOT + "." + "aliyun.secretKey";
-    public static final String CANAL_MQ_PROPERTIES                   = ROOT + "." + "mq.properties";
-    public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE         = ROOT + "." + "mq.enableMessageTrace";
-    public static final String CANAL_MQ_ACCESS_CHANNEL               = ROOT + "." + "mq.accessChannel";
-    public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC       = ROOT + "." + "mq.customizedTraceTopic";
-    public static final String CANAL_MQ_NAMESPACE                    = ROOT + "." + "mq.namespace";
-    public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE        = ROOT + "." + "mq.kafka.kerberos.enable";
-    public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH  = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
-    public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH  = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
+    public static final String CANAL_MQ_SERVERS                     = ROOT + "." + "mq.servers";
+    public static final String CANAL_MQ_RETRIES                     = ROOT + "." + "mq.retries";
+    public static final String CANAL_MQ_BATCHSIZE                   = ROOT + "." + "mq.batchSize";
+    public static final String CANAL_MQ_LINGERMS                    = ROOT + "." + "mq.lingerMs";
+    public static final String CANAL_MQ_MAXREQUESTSIZE              = ROOT + "." + "mq.maxRequestSize";
+    public static final String CANAL_MQ_BUFFERMEMORY                = ROOT + "." + "mq.bufferMemory";
+    public static final String CANAL_MQ_CANALBATCHSIZE              = ROOT + "." + "mq.canalBatchSize";
+    public static final String CANAL_MQ_CANALGETTIMEOUT             = ROOT + "." + "mq.canalGetTimeout";
+    public static final String CANAL_MQ_FLATMESSAGE                 = ROOT + "." + "mq.flatMessage";
+    public static final String CANAL_MQ_COMPRESSION_TYPE            = ROOT + "." + "mq.compressionType";
+    public static final String CANAL_MQ_ACKS                        = ROOT + "." + "mq.acks";
+    public static final String CANAL_MQ_TRANSACTION                 = ROOT + "." + "mq.transaction";
+    public static final String CANAL_MQ_PRODUCERGROUP               = ROOT + "." + "mq.producerGroup";
+    public static final String CANAL_ALIYUN_ACCESSKEY               = ROOT + "." + "aliyun.accessKey";
+    public static final String CANAL_ALIYUN_SECRETKEY               = ROOT + "." + "aliyun.secretKey";
+    public static final String CANAL_MQ_PROPERTIES                  = ROOT + "." + "mq.properties";
+    public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE        = ROOT + "." + "mq.enableMessageTrace";
+    public static final String CANAL_MQ_ACCESS_CHANNEL              = ROOT + "." + "mq.accessChannel";
+    public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC      = ROOT + "." + "mq.customizedTraceTopic";
+    public static final String CANAL_MQ_NAMESPACE                   = ROOT + "." + "mq.namespace";
+    public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE       = ROOT + "." + "mq.kafka.kerberos.enable";
+    public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
+    public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
 
     public static String getInstanceModeKey(String destination) {
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);

+ 22 - 4
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -3,6 +3,9 @@ package com.alibaba.otter.canal.deployer;
 import java.io.FileInputStream;
 import java.util.Properties;
 
+import com.alibaba.otter.canal.deployer.mbean.CanalServerAgent;
+import com.alibaba.otter.canal.deployer.mbean.CanalServerMXBean;
+import com.alibaba.otter.canal.deployer.mbean.CanalServer;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,8 +56,8 @@ public class CanalLauncher {
                 }
             }
 
-            final CanalStater canalStater = new CanalStater();
-            canalStater.start(properties);
+            final CanalStater canalStater = new CanalStater(properties);
+            canalStater.start();
 
             if (remoteConfigLoader != null) {
                 remoteConfigLoader.startMonitor(new RemoteCanalConfigMonitor() {
@@ -63,8 +66,9 @@ public class CanalLauncher {
                     public void onChange(Properties properties) {
                         try {
                             // 远程配置canal.properties修改重新加载整个应用
-                            canalStater.destroy();
-                            canalStater.start(properties);
+                            canalStater.stop();
+                            canalStater.setProperties(properties);
+                            canalStater.start();
                         } catch (Throwable throwable) {
                             logger.error(throwable.getMessage(), throwable);
                         }
@@ -72,10 +76,24 @@ public class CanalLauncher {
                 });
             }
 
+            CanalServerAgent canalServerAgent = null;
+            String jmxPort = properties.getProperty(CanalConstants.CANAL_ADMIN_JMX_PORT);
+            if (StringUtils.isNotEmpty(jmxPort)) {
+                String ip = properties.getProperty(CanalConstants.CANAL_IP);
+                CanalServerMXBean canalServerMBean = new CanalServer(canalStater);
+                canalServerAgent = new CanalServerAgent(ip, Integer.parseInt(jmxPort), canalServerMBean);
+                Thread agentThread = new Thread(canalServerAgent::start);
+                agentThread.start();
+            }
+
             while (running) {
                 Thread.sleep(1000);
             }
 
+            if (canalServerAgent != null) {
+                canalServerAgent.stop();
+            }
+
             if (remoteConfigLoader != null) {
                 remoteConfigLoader.destroy();
             }

+ 35 - 10
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -32,14 +32,31 @@ public class CanalStater {
     private CanalMQProducer     canalMQProducer = null;
     private Thread              shutdownThread  = null;
     private CanalMQStarter      canalMQStarter  = null;
+    private volatile Properties properties;
+    private volatile boolean    running         = false;
+
+    public CanalStater(Properties properties){
+        this.properties = properties;
+    }
+
+    public boolean isRunning() {
+        return running;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
 
     /**
      * 启动方法
      *
-     * @param properties canal.properties 配置
      * @throws Throwable
      */
-    synchronized void start(Properties properties) throws Throwable {
+    public synchronized void start() throws Throwable {
         String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
         if (serverMode.equalsIgnoreCase("kafka")) {
             canalMQProducer = new CanalKafkaProducer();
@@ -64,8 +81,8 @@ public class CanalStater {
 
                         public boolean accept(File pathname) {
                             String filename = pathname.getName();
-                            return pathname.isDirectory() && !"spring".equalsIgnoreCase(filename) &&
-                                    !"metrics".equalsIgnoreCase(filename);
+                            return pathname.isDirectory() && !"spring".equalsIgnoreCase(filename)
+                                   && !"metrics".equalsIgnoreCase(filename);
                         }
                     });
                     if (instanceDirs != null && instanceDirs.length > 0) {
@@ -113,6 +130,8 @@ public class CanalStater {
             canalMQStarter.start(mqProperties);
             controller.setCanalMQStarter(canalMQStarter);
         }
+
+        running = true;
     }
 
     /**
@@ -120,7 +139,7 @@ public class CanalStater {
      *
      * @throws Throwable
      */
-    synchronized void destroy() throws Throwable {
+    public synchronized void stop() throws Throwable {
         if (controller != null) {
             controller.stop();
             controller = null;
@@ -134,6 +153,7 @@ public class CanalStater {
             canalMQStarter = null;
             canalMQProducer = null;
         }
+        running = false;
     }
 
     /**
@@ -206,7 +226,8 @@ public class CanalStater {
             mqProperties.setProducerGroup(producerGroup);
         }
 
-        String enableMessageTrace = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ENABLE_MESSAGE_TRACE);
+        String enableMessageTrace = CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_ENABLE_MESSAGE_TRACE);
         if (!StringUtils.isEmpty(enableMessageTrace)) {
             mqProperties.setEnableMessageTrace(Boolean.valueOf(enableMessageTrace));
         }
@@ -216,7 +237,8 @@ public class CanalStater {
             mqProperties.setAccessChannel(accessChannel);
         }
 
-        String customizedTraceTopic = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CUSTOMIZED_TRACE_TOPIC);
+        String customizedTraceTopic = CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_CUSTOMIZED_TRACE_TOPIC);
         if (!StringUtils.isEmpty(customizedTraceTopic)) {
             mqProperties.setCustomizedTraceTopic(customizedTraceTopic);
         }
@@ -226,17 +248,20 @@ public class CanalStater {
             mqProperties.setNamespace(namespace);
         }
 
-        String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
+        String kafkaKerberosEnable = CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
         if (!StringUtils.isEmpty(kafkaKerberosEnable)) {
             mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable));
         }
 
-        String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
+        String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
         if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) {
             mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath);
         }
 
-        String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
+        String kafkaKerberosJaasFilepath = CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
         if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) {
             mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
         }

+ 67 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/mbean/CanalServer.java

@@ -0,0 +1,67 @@
+package com.alibaba.otter.canal.deployer.mbean;
+
+import com.alibaba.otter.canal.deployer.CanalLauncher;
+import com.alibaba.otter.canal.deployer.CanalStater;
+
+public class CanalServer implements CanalServerMXBean {
+
+    private volatile int status;
+
+    private CanalStater  canalStater;
+
+    public CanalServer(CanalStater canalStater){
+        this.canalStater = canalStater;
+        this.status = canalStater.isRunning() ? 1 : 0;
+    }
+
+    @Override
+    public int getStatus() {
+        return status;
+    }
+
+    public void setStatus(int status) {
+        this.status = status;
+    }
+
+    @Override
+    public synchronized boolean start() {
+        try {
+            if (!canalStater.isRunning()) {
+                canalStater.start();
+                status = 1;
+                return true;
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    @Override
+    public synchronized boolean stop() {
+        try {
+            if (canalStater.isRunning()) {
+                canalStater.stop();
+                status = 0;
+                return true;
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    @Override
+    public synchronized boolean restart() {
+        stop();
+        return start();
+    }
+
+    @Override
+    public synchronized boolean exit() {
+        stop();
+        CanalLauncher.running = false;
+        return true;
+    }
+
+}

+ 72 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/mbean/CanalServerAgent.java

@@ -0,0 +1,72 @@
+package com.alibaba.otter.canal.deployer.mbean;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.rmi.registry.LocateRegistry;
+
+import javax.management.MBeanServer;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+
+public class CanalServerAgent implements NotificationListener {
+
+    private MBeanServer                 mBeanServer = ManagementFactory.getPlatformMBeanServer();
+
+    private String                      ip          = "0.0.0.0";
+
+    private int                         port;
+
+    private CanalServerMXBean           canalServerMbean;
+
+    private volatile JMXConnectorServer cs;
+
+    public CanalServerAgent(String ip, int port, CanalServerMXBean canalServerMbean){
+        if (StringUtils.isNotEmpty(ip)) {
+            this.ip = ip;
+        }
+        this.port = port;
+        this.canalServerMbean = canalServerMbean;
+    }
+
+    @Override
+    public void handleNotification(Notification notification, Object handback) {
+
+    }
+
+    public synchronized void start() {
+        try {
+            if (cs == null) {
+                ObjectName name = new ObjectName("CanalServerAgent:type=CanalServerStatus");
+                mBeanServer.registerMBean(canalServerMbean, name);
+
+                LocateRegistry.createRegistry(port);
+
+                // 构造JMXServiceURL
+                JMXServiceURL jmxServiceURL = new JMXServiceURL(
+                    "service:jmx:rmi:///jndi/rmi://" + ip + ":" + port + "/jmxrmi");
+                // 创建JMXConnectorServer
+                cs = JMXConnectorServerFactory.newJMXConnectorServer(jmxServiceURL, null, mBeanServer);
+                // 启动
+                cs.start();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public synchronized void stop() {
+        if (cs != null) {
+            try {
+                cs.stop();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}

+ 14 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/mbean/CanalServerMXBean.java

@@ -0,0 +1,14 @@
+package com.alibaba.otter.canal.deployer.mbean;
+
+public interface CanalServerMXBean {
+
+    int getStatus();
+
+    boolean start();
+
+    boolean stop();
+
+    boolean restart();
+
+    boolean exit();
+}