1
0
Эх сурвалжийг харах

Canal metrics snapshot version.

Chuanyi Li 6 жил өмнө
parent
commit
d24e10fb3e

+ 6 - 6
deployer/pom.xml

@@ -18,12 +18,12 @@
 		</dependency>
 
 		<!-- 这里指定runtime的metrics provider-->
-		<!--<dependency>-->
-			<!--<groupId>com.alibaba.otter</groupId>-->
-			<!--<artifactId>canal.prometheus</artifactId>-->
-			<!--<version>${project.version}</version>-->
-			<!--<scope>runtime</scope>-->
-		<!--</dependency>-->
+		<dependency>
+			<groupId>com.alibaba.otter</groupId>
+			<artifactId>canal.prometheus</artifactId>
+			<version>${project.version}</version>
+			<scope>runtime</scope>
+		</dependency>
 	</dependencies>
 	
 	<build>

+ 0 - 15
deployer/src/main/bin/metrics_env.sh

@@ -1,15 +0,0 @@
-#!/bin/bash
-# Additional line arg for current prometheus solution
-case "`uname`" in
-Linux)
-    bin_abs_path=$(readlink -f $(dirname $0))
-	;;
-*)
-	bin_abs_path=`cd $(dirname $0); pwd`
-	;;
-esac
-base=${bin_abs_path}/..
-if [ $(ls $base/lib/aspectjweaver*.jar | wc -l) -eq 1 ]; then
-    WEAVER=$(ls $base/lib/aspectjweaver*.jar)
-    METRICS_OPTS=" -javaagent:"${WEAVER}" "
-fi

+ 1 - 6
deployer/src/main/bin/startup.sh

@@ -94,12 +94,7 @@ then
 	echo LOG CONFIGURATION : $logback_configurationFile
 	echo canal conf : $canal_conf 
 	echo CLASSPATH :$CLASSPATH
-#   metrics support options
-#	if [ -x $base/bin/metrics_env.sh ]; then
-#	    . $base/bin/metrics_env.sh
-#	    echo METRICS_OPTS $METRICS_OPTS
-#	fi
-	$JAVA $JAVA_OPTS $METRICS_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher 1>>$base/logs/canal/canal.log 2>&1 &
+	$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher 1>>$base/logs/canal/canal.log 2>&1 &
 	echo $! > $base/bin/canal.pid 
 	
 	echo "cd to $current_path for continue"

+ 2 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -50,7 +50,8 @@ public class MysqlConnection implements ErosaConnection {
     private AuthenticationInfo        authInfo;
     protected     int                 connTimeout               = 5 * 1000;                                      // 5秒
     protected     int                 soTimeout                 = 60 * 60 * 1000;                                // 1小时
-    private AtomicLong receivedBinlogBytes;
+    // dump binlog bytes, 暂不包括meta与TSDB
+    private AtomicLong                receivedBinlogBytes;
 
     public MysqlConnection(){
     }

+ 5 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -187,6 +187,11 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
                 //LockSupport.parkNanos(1L);
                 applyWait(++fullTimes);
                 interupted = Thread.interrupted();
+                if (fullTimes % 1000 == 0) {
+                    long nextStart = System.nanoTime();
+                    eventsPublishBlockingTime.addAndGet(nextStart - blockingStart);
+                    blockingStart = nextStart;
+                }
             }
         } while (!interupted && isStart());
         return isStart();

+ 1 - 1
pom.xml

@@ -247,7 +247,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_186</version>
+                <version>2.0.0_preview_520</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>

+ 1 - 1
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalInstanceExports.java

@@ -77,7 +77,7 @@ public class CanalInstanceExports {
 
     private InstanceRegistry requiredInstanceRegistry(Collector collector) {
         if (!(collector instanceof InstanceRegistry)) {
-            throw new IllegalArgumentException("Canal collector need to implement InstanceRegistry.");
+            throw new IllegalArgumentException("Canal prometheus collector need to implement InstanceRegistry.");
         }
         return (InstanceRegistry) collector;
     }

+ 6 - 5
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java

@@ -19,14 +19,15 @@ import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.
  */
 public class PrometheusService implements CanalMetricsService {
 
-    private static final Logger                           logger          = LoggerFactory.getLogger(PrometheusService.class);
-    private final CanalInstanceExports                    instanceExports;
-    private volatile boolean                              running         = false;
-    private HTTPServer                                    server;
-    private ClientInstanceProfiler                        clientProfiler  = new PrometheusClientInstanceProfiler();
+    private static final Logger          logger          = LoggerFactory.getLogger(PrometheusService.class);
+    private final CanalInstanceExports   instanceExports;
+    private volatile boolean             running         = false;
+    private HTTPServer                   server;
+    private final ClientInstanceProfiler clientProfiler;
 
     private PrometheusService() {
         this.instanceExports = CanalInstanceExports.instance();
+        this.clientProfiler = PrometheusClientInstanceProfiler.instance();
     }
 
     private static class SingletonHolder {

+ 6 - 2
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/EntryCollector.java

@@ -29,7 +29,7 @@ public class EntryCollector extends Collector implements InstanceRegistry {
     private static final Logger                             logger           = LoggerFactory.getLogger(SinkCollector.class);
     private static final String                             DELAY            = "canal_instance_traffic_delay";
     private static final String                             TRANSACTION      = "canal_instance_transactions";
-    private static final String                             DELAY_HELP       = "Traffic delay of canal instance";
+    private static final String                             DELAY_HELP       = "Traffic delay of canal instance in milliseconds";
     private static final String                             TRANSACTION_HELP = "Transactions counter of canal instance";
     private final ConcurrentMap<String, EntryMetricsHolder> instances        = new ConcurrentHashMap<String, EntryMetricsHolder>();
 
@@ -51,7 +51,11 @@ public class EntryCollector extends Collector implements InstanceRegistry {
         CounterMetricFamily transactions = new CounterMetricFamily(TRANSACTION,
                 TRANSACTION_HELP, DEST_LABELS_LIST);
         for (EntryMetricsHolder emh : instances.values()) {
-            delay.addMetric(emh.destLabelValues, emh.latestExecTime.doubleValue());
+            long now = System.currentTimeMillis();
+            long latest = emh.latestExecTime.get();
+            if (now > latest) {
+                delay.addMetric(emh.destLabelValues, (now - latest));
+            }
             transactions.addMetric(emh.destLabelValues, emh.transactionCounter.doubleValue());
         }
         mfs.add(delay);

+ 17 - 12
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MetaCollector.java

@@ -11,22 +11,23 @@ import io.prometheus.client.GaugeMetricFamily;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
+
 /**
  * @author Chuanyi Li
  */
 public class MetaCollector extends Collector implements InstanceRegistry {
 
-    private static final List<String>                      InfoLabel         = Arrays.asList("destination", "mode");
+    private static final List<String>                      INFO_LABELS_LIST  = Arrays.asList("destination", "mode");
     private static final Logger                            logger            = LoggerFactory.getLogger(MetaCollector.class);
-    private static final String                            SUBSCRIPTION      = "canal_instance";
-    private static final String                            SUBSCRIPTION_HELP = "Canal instance";
+    private static final String                            INSTANCE          = "canal_instance";
+    private static final String                            INSTANCE_HELP     = "Canal instance";
+    private static final String                            SUBSCRIPTION      = "canal_instance_subscriptions";
+    private static final String                            SUBSCRIPTION_HELP = "Canal instance subscriptions";
     private final ConcurrentMap<String, MetaMetricsHolder> instances         = new ConcurrentHashMap<String, MetaMetricsHolder>();
 
     private MetaCollector() {}
@@ -42,18 +43,20 @@ public class MetaCollector extends Collector implements InstanceRegistry {
     @Override
     public List<MetricFamilySamples> collect() {
         List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
-        GaugeMetricFamily instanceInfo = new GaugeMetricFamily(
-                SUBSCRIPTION,
-                SUBSCRIPTION_HELP,
-                InfoLabel);
+        GaugeMetricFamily instanceInfo = new GaugeMetricFamily(INSTANCE,
+                INSTANCE_HELP, INFO_LABELS_LIST);
+        GaugeMetricFamily subsInfo = new GaugeMetricFamily(SUBSCRIPTION,
+                SUBSCRIPTION_HELP, DEST_LABELS_LIST);
         for (Map.Entry<String, MetaMetricsHolder> nme : instances.entrySet()) {
             final String destination = nme.getKey();
             final MetaMetricsHolder nmh = nme.getValue();
+            instanceInfo.addMetric(nmh.infoLabelValues, 1);
             List<ClientIdentity> subs = nmh.metaManager.listAllSubscribeInfo(destination);
             int count = subs == null ? 0 : subs.size();
-            instanceInfo.addMetric(nmh.infoLabelValues, count);
+            subsInfo.addMetric(nmh.destLabelValues, count);
         }
         mfs.add(instanceInfo);
+        mfs.add(subsInfo);
         return mfs;
     }
 
@@ -63,6 +66,7 @@ public class MetaCollector extends Collector implements InstanceRegistry {
         MetaMetricsHolder holder = new MetaMetricsHolder();
         String mode = (instance instanceof CanalInstanceWithSpring) ? "spring" : "manager";
         holder.infoLabelValues = Arrays.asList(destination, mode);
+        holder.destLabelValues = Collections.singletonList(destination);
         holder.metaManager = instance.getMetaManager();
         Preconditions.checkNotNull(holder.metaManager);
         MetaMetricsHolder old = instances.put(destination, holder);
@@ -79,6 +83,7 @@ public class MetaCollector extends Collector implements InstanceRegistry {
 
     private class MetaMetricsHolder {
         private List<String>     infoLabelValues;
+        private List<String>     destLabelValues;
         private CanalMetaManager metaManager;
     }
 

+ 4 - 2
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/ParserCollector.java

@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 /**
  * @author Chuanyi Li
@@ -28,11 +29,12 @@ import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABEL
 public class ParserCollector extends Collector implements InstanceRegistry {
 
     private static final Logger                              logger                = LoggerFactory.getLogger(ParserCollector.class);
+    private static final long                                NANO_PER_MILLI        = 1000 * 1000L;
     private static final String                              PUBLISH_BLOCKING      = "canal_instance_publish_blocking_time";
     private static final String                              RECEIVED_BINLOG       = "canal_instance_received_binlog_bytes";
     private static final String                              PARSER_MODE           = "canal_instance_parser_mode";
     private static final String                              MODE_LABEL            = "parallel";
-    private static final String                              PUBLISH_BLOCKING_HELP = "Publish blocking time of dump thread";
+    private static final String                              PUBLISH_BLOCKING_HELP = "Publish blocking time of dump thread in milliseconds";
     private static final String                              RECEIVED_BINLOG_HELP  = "Received binlog bytes";
     private static final String                              MODE_HELP             = "Parser mode(parallel/serial) of instance";
     private final List<String>                               modeLabels            = Arrays.asList(DEST, MODE_LABEL);
@@ -60,7 +62,7 @@ public class ParserCollector extends Collector implements InstanceRegistry {
                 PUBLISH_BLOCKING_HELP, DEST_LABELS_LIST);
         for (ParserMetricsHolder emh : instances.values()) {
             if (emh.isParallel) {
-                blockingCounter.addMetric(emh.destLabelValues, emh.eventsPublishBlockingTime.doubleValue());
+                blockingCounter.addMetric(emh.destLabelValues, (emh.eventsPublishBlockingTime.doubleValue() / NANO_PER_MILLI));
                 hasParallel = true;
             }
             modeGauge.addMetric(emh.modeLabelValues, 1);

+ 16 - 5
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.prometheus.impl;
 
+import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
 import com.alibaba.otter.canal.sink.AbstractCanalEventDownStreamHandler;
 import com.alibaba.otter.canal.store.model.Event;
@@ -24,18 +25,28 @@ public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDow
             for (Event e : events) {
                 EntryType type = e.getEntryType();
                 if (type == null) continue;
-                long exec = e.getExecuteTime();
-                if (exec > 0) localExecTime = exec;
                 switch (type) {
-                    case ROWDATA:
+                    case TRANSACTIONBEGIN: {
+                        long exec = e.getExecuteTime();
+                        if (exec > 0) localExecTime = exec;
+                        break;
+                    }
+                    case ROWDATA: {
+                        long exec = e.getExecuteTime();
+                        if (exec > 0) localExecTime = exec;
                         // TODO 当前proto无法直接获得荣威change的变更行数(需要parse),可考虑放到header里面
                         break;
-                    case TRANSACTIONEND:
+                    }
+                    case TRANSACTIONEND: {
+                        long exec = e.getExecuteTime();
+                        if (exec > 0) localExecTime = exec;
                         transactionCounter.incrementAndGet();
                         break;
+                    }
                     case HEARTBEAT:
+                        // 发现canal自己的heartbeat是带有execTime的
                         // TODO 确认一下不是canal自己产生的
-                        // EventType eventType = e.getEventType();
+                        CanalEntry.EventType eventType = e.getEventType();
                         // TODO utilize MySQL master heartbeat packet to refresh delay if always no more events coming
                         // see: https://dev.mysql.com/worklog/task/?id=342
                         // heartbeats are sent by the master only if there is no

+ 12 - 3
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfiler.java

@@ -15,6 +15,7 @@ import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABEL
  */
 public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler {
 
+    private static final long   NANO_PER_MILLI = 1000 * 1000L;
     private static final String PACKET_TYPE    = "canal_instance_client_packets";
     private static final String OUTBOUND_BYTES = "canal_instance_client_bytes";
     private static final String EMPTY_BATCHES  = "canal_instance_client_empty_batches";
@@ -27,7 +28,15 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
     private final Histogram     responseLatency;
     private volatile boolean    running        = false;
 
-    public PrometheusClientInstanceProfiler() {
+    private static class SingletonHolder {
+        private static final PrometheusClientInstanceProfiler SINGLETON = new PrometheusClientInstanceProfiler();
+    }
+
+    public static PrometheusClientInstanceProfiler instance() {
+        return SingletonHolder.SINGLETON;
+    }
+
+    private PrometheusClientInstanceProfiler() {
         this.outboundCounter = Counter.build()
                 .labelNames(DEST_LABELS)
                 .name(OUTBOUND_BYTES)
@@ -53,7 +62,7 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
                 .name(LATENCY)
                 .help("Client request latency.")
                 // buckets in milliseconds
-                .buckets(1.0, 2.5, 5.0, 10.0, 25.0, 50.0, 100.0)
+                .buckets(2.5, 10.0, 25.0, 100.0)
                 .create();
     }
 
@@ -68,7 +77,7 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
             errorsCounter.labels(destination, Short.toString(errorCode)).inc();
         }
         long latency = result.getLatency();
-        responseLatency.labels(destination).observe(latency / 1000000);
+        responseLatency.labels(destination).observe(((double) latency) / NANO_PER_MILLI);
         switch (type) {
             case GET:
                 boolean empty = result.getEmpty();

+ 4 - 2
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/SinkCollector.java

@@ -18,6 +18,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 /**
  * @author Chuanyi Li
@@ -25,8 +26,9 @@ import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABEL
 public class SinkCollector extends Collector implements InstanceRegistry {
 
     private static final Logger                            logger               = LoggerFactory.getLogger(SinkCollector.class);
+    private static final long                              NANO_PER_MILLI       = 1000 * 1000L;
     private static final String                            SINK_BLOCKING_TIME   = "canal_instance_sink_blocking_time";
-    private static final String                            SINK_BLOCK_TIME_HELP = "Total sink blocking time";
+    private static final String                            SINK_BLOCK_TIME_HELP = "Total sink blocking time in milliseconds";
     private final ConcurrentMap<String, SinkMetricsHolder> instances            = new ConcurrentHashMap<String, SinkMetricsHolder>();
 
     private SinkCollector() {}
@@ -45,7 +47,7 @@ public class SinkCollector extends Collector implements InstanceRegistry {
         CounterMetricFamily blockingCounter = new CounterMetricFamily(SINK_BLOCKING_TIME,
                 SINK_BLOCK_TIME_HELP, DEST_LABELS_LIST);
         for (SinkMetricsHolder smh : instances.values()) {
-            blockingCounter.addMetric(smh.destLabelValues, smh.eventsSinkBlockingTime.doubleValue());
+            blockingCounter.addMetric(smh.destLabelValues, (smh.eventsSinkBlockingTime.doubleValue() / NANO_PER_MILLI));
         }
         mfs.add(blockingCounter);
         return mfs;

+ 5 - 0
sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

@@ -164,6 +164,11 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
                     blockingStart = System.nanoTime();
                 }
                 applyWait(++fullTimes);
+                if (fullTimes % 1000 == 0) {
+                    long nextStart = System.nanoTime();
+                    eventsSinkBlockingTime.addAndGet(nextStart - blockingStart);
+                    blockingStart = nextStart;
+                }
             }
 
             for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {