Quellcode durchsuchen

fixed merge code

七锋 vor 6 Jahren
Ursprung
Commit
0131c3ad13

+ 8 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -98,6 +98,14 @@ public class CanalController {
         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
         embededCanalServer = CanalServerWithEmbedded.instance();
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
+        try {
+            int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT));
+            embededCanalServer.setMetricsPort(metricsPort);
+        } catch (NumberFormatException e) {
+            logger.info("No valid metrics server port found, use default 11112.");
+            embededCanalServer.setMetricsPort(11112);
+        }
+        
         String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
         if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
             canalServer = CanalServerWithNetty.instance();

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -4,6 +4,7 @@
 canal.id= 1
 canal.ip=
 canal.port=11111
+canal.metrics.pull.port=11112
 canal.zkServers=
 # flush data to zk
 canal.zookeeper.flush.period = 1000

+ 9 - 3
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java

@@ -22,6 +22,7 @@ public class PrometheusService implements CanalMetricsService {
     private static final Logger          logger          = LoggerFactory.getLogger(PrometheusService.class);
     private final CanalInstanceExports   instanceExports;
     private volatile boolean             running         = false;
+    private int                          port;
     private HTTPServer                   server;
     private final ClientInstanceProfiler clientProfiler;
 
@@ -41,9 +42,9 @@ public class PrometheusService implements CanalMetricsService {
     @Override
     public void initialize() {
         try {
-            //TODO 1.Configurable port
-            //TODO 2.Https
-            server = new HTTPServer(11112);
+            logger.info("Start prometheus HTTPServer on port {}.", port);
+            //TODO 2.Https?
+            server = new HTTPServer(port);
         } catch (IOException e) {
             logger.warn("Unable to start prometheus HTTPServer.", e);
             return;
@@ -112,4 +113,9 @@ public class PrometheusService implements CanalMetricsService {
         logger.info("Unregister metrics for destination {}.", instance.getDestination());
     }
 
+    @Override
+    public void setServerPort(int port) {
+        this.port = port;
+    }
+
 }

+ 10 - 10
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/ParserCollector.java

@@ -1,15 +1,10 @@
 package com.alibaba.otter.canal.prometheus.impl;
 
-import com.alibaba.otter.canal.instance.core.CanalInstance;
-import com.alibaba.otter.canal.parse.CanalEventParser;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
-import com.alibaba.otter.canal.prometheus.InstanceRegistry;
-import com.google.common.base.Preconditions;
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
 import io.prometheus.client.Collector;
 import io.prometheus.client.CounterMetricFamily;
 import io.prometheus.client.GaugeMetricFamily;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -19,9 +14,14 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
-import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+import com.alibaba.otter.canal.parse.CanalEventParser;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
+import com.alibaba.otter.canal.prometheus.InstanceRegistry;
+import com.google.common.base.Preconditions;
 
 /**
  * @author Chuanyi Li

+ 4 - 2
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/StoreCollector.java

@@ -40,7 +40,7 @@ public class StoreCollector extends Collector implements InstanceRegistry {
     private static final String                             PRODUCE_MEM_HELP = "Produced mem bytes of canal instance";
     private static final String                             CONSUME_MEM_HELP = "Consumed mem bytes of canal instance";
     private final ConcurrentMap<String, StoreMetricsHolder> instances        = new ConcurrentHashMap<String, StoreMetricsHolder>();
-    private final List<String>                              storeLabelsList  = Arrays.asList(DEST, "batchMode");
+    private final List<String>                              storeLabelsList  = Arrays.asList(DEST, "batchMode", "size");
 
     private StoreCollector() {}
 
@@ -100,7 +100,8 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         holder.putSeq = memStore.getPutSequence();
         holder.ackSeq = memStore.getAckSequence();
         holder.destLabelValues = Collections.singletonList(destination);
-        holder.storeLabelValues = Arrays.asList(destination, memStore.getBatchMode().name());
+        holder.size = memStore.getBufferSize();
+        holder.storeLabelValues = Arrays.asList(destination, holder.batchMode.name(), Integer.toString(holder.size));
         Preconditions.checkNotNull(holder.batchMode);
         Preconditions.checkNotNull(holder.putSeq);
         Preconditions.checkNotNull(holder.ackSeq);
@@ -128,6 +129,7 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         private BatchMode    batchMode;
         private AtomicLong   putMemSize;
         private AtomicLong   ackMemSize;
+        private int          size;
         private List<String> destLabelValues;
         private List<String> storeLabelValues;
     }

+ 6 - 0
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -44,6 +44,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
     private Map<String, CanalInstance> canalInstances;
     // private Map<ClientIdentity, Position> lastRollbackPostions;
     private CanalInstanceGenerator     canalInstanceGenerator;
+    private int                        metricsPort;
     private CanalMetricsService        metrics          = NopCanalMetricsService.NOP;
 
     private static class SingletonHolder {
@@ -64,6 +65,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             super.start();
             // 如果存在provider,则启动metrics service
             loadCanalMetrics();
+            metrics.setServerPort(metricsPort);
             metrics.initialize();
             canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {
 
@@ -529,4 +531,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
         this.canalInstanceGenerator = canalInstanceGenerator;
     }
 
+    public void setMetricsPort(int metricsPort) {
+        this.metricsPort = metricsPort;
+    }
+
 }

+ 5 - 0
server/src/main/java/com/alibaba/otter/canal/spi/CanalMetricsService.java

@@ -39,4 +39,9 @@ public interface CanalMetricsService {
      */
     void unregister(CanalInstance instance);
 
+    /**
+     * @param port server port for pull
+     */
+    void setServerPort(int port);
+
 }

+ 5 - 0
server/src/main/java/com/alibaba/otter/canal/spi/NopCanalMetricsService.java

@@ -35,4 +35,9 @@ public class NopCanalMetricsService implements CanalMetricsService {
     public void unregister(CanalInstance instance) {
 
     }
+
+    @Override
+    public void setServerPort(int port) {
+
+    }
 }