Browse Source

Configurable metrics server port.

Chuanyi Li 6 years ago
parent
commit
e8aee3a276

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -15,6 +15,7 @@ public class CanalConstants {
     public static final String CANAL_ID                          = ROOT + "." + "id";
     public static final String CANAL_ID                          = ROOT + "." + "id";
     public static final String CANAL_IP                          = ROOT + "." + "ip";
     public static final String CANAL_IP                          = ROOT + "." + "ip";
     public static final String CANAL_PORT                        = ROOT + "." + "port";
     public static final String CANAL_PORT                        = ROOT + "." + "port";
+    public static final String CANAL_METRICS_PULL_PORT           = ROOT + "." + "metrics.pull.port";
     public static final String CANAL_ZKSERVERS                   = ROOT + "." + "zkServers";
     public static final String CANAL_ZKSERVERS                   = ROOT + "." + "zkServers";
     public static final String CANAL_WITHOUT_NETTY               = ROOT + "." + "withoutNetty";
     public static final String CANAL_WITHOUT_NETTY               = ROOT + "." + "withoutNetty";
 
 

+ 8 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -98,6 +98,14 @@ public class CanalController {
         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
         embededCanalServer = CanalServerWithEmbedded.instance();
         embededCanalServer = CanalServerWithEmbedded.instance();
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
+        try {
+            int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT));
+            embededCanalServer.setMetricsPort(metricsPort);
+        } catch (NumberFormatException e) {
+            logger.info("No valid metrics server port found, use default 11112.");
+            embededCanalServer.setMetricsPort(11112);
+        }
+
         String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
         String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
         if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
         if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
             canalServer = CanalServerWithNetty.instance();
             canalServer = CanalServerWithNetty.instance();

+ 1 - 1
instance/spring/src/test/resources/canal.properties

@@ -51,4 +51,4 @@ canal.instance.global.mode = spring
 canal.instance.global.lazy = false
 canal.instance.global.lazy = false
 #canal.instance.global.manager.address = 127.0.0.1:1099
 #canal.instance.global.manager.address = 127.0.0.1:1099
 canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
 canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
-#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
+#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

+ 9 - 3
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java

@@ -22,6 +22,7 @@ public class PrometheusService implements CanalMetricsService {
     private static final Logger          logger          = LoggerFactory.getLogger(PrometheusService.class);
     private static final Logger          logger          = LoggerFactory.getLogger(PrometheusService.class);
     private final CanalInstanceExports   instanceExports;
     private final CanalInstanceExports   instanceExports;
     private volatile boolean             running         = false;
     private volatile boolean             running         = false;
+    private int                          port;
     private HTTPServer                   server;
     private HTTPServer                   server;
     private final ClientInstanceProfiler clientProfiler;
     private final ClientInstanceProfiler clientProfiler;
 
 
@@ -41,9 +42,9 @@ public class PrometheusService implements CanalMetricsService {
     @Override
     @Override
     public void initialize() {
     public void initialize() {
         try {
         try {
-            //TODO 1.Configurable port
-            //TODO 2.Https
-            server = new HTTPServer(11112);
+            logger.info("Start prometheus HTTPServer on port {}.", port);
+            //TODO 2.Https?
+            server = new HTTPServer(port);
         } catch (IOException e) {
         } catch (IOException e) {
             logger.warn("Unable to start prometheus HTTPServer.", e);
             logger.warn("Unable to start prometheus HTTPServer.", e);
             return;
             return;
@@ -112,4 +113,9 @@ public class PrometheusService implements CanalMetricsService {
         logger.info("Unregister metrics for destination {}.", instance.getDestination());
         logger.info("Unregister metrics for destination {}.", instance.getDestination());
     }
     }
 
 
+    @Override
+    public void setServerPort(int port) {
+        this.port = port;
+    }
+
 }
 }

+ 6 - 0
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -44,6 +44,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
     private Map<String, CanalInstance> canalInstances;
     private Map<String, CanalInstance> canalInstances;
     // private Map<ClientIdentity, Position> lastRollbackPostions;
     // private Map<ClientIdentity, Position> lastRollbackPostions;
     private CanalInstanceGenerator     canalInstanceGenerator;
     private CanalInstanceGenerator     canalInstanceGenerator;
+    private int                        metricsPort;
     private CanalMetricsService        metrics          = NopCanalMetricsService.NOP;
     private CanalMetricsService        metrics          = NopCanalMetricsService.NOP;
 
 
     private static class SingletonHolder {
     private static class SingletonHolder {
@@ -64,6 +65,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             super.start();
             super.start();
             // 如果存在provider,则启动metrics service
             // 如果存在provider,则启动metrics service
             loadCanalMetrics();
             loadCanalMetrics();
+            metrics.setServerPort(metricsPort);
             metrics.initialize();
             metrics.initialize();
             canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {
             canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {
 
 
@@ -529,4 +531,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
         this.canalInstanceGenerator = canalInstanceGenerator;
         this.canalInstanceGenerator = canalInstanceGenerator;
     }
     }
 
 
+    public void setMetricsPort(int metricsPort) {
+        this.metricsPort = metricsPort;
+    }
+
 }
 }

+ 5 - 0
server/src/main/java/com/alibaba/otter/canal/spi/CanalMetricsService.java

@@ -39,4 +39,9 @@ public interface CanalMetricsService {
      */
      */
     void unregister(CanalInstance instance);
     void unregister(CanalInstance instance);
 
 
+    /**
+     * @param port server port for pull
+     */
+    void setServerPort(int port);
+
 }
 }

+ 5 - 0
server/src/main/java/com/alibaba/otter/canal/spi/NopCanalMetricsService.java

@@ -35,4 +35,9 @@ public class NopCanalMetricsService implements CanalMetricsService {
     public void unregister(CanalInstance instance) {
     public void unregister(CanalInstance instance) {
 
 
     }
     }
+
+    @Override
+    public void setServerPort(int port) {
+
+    }
 }
 }