1
0
Эх сурвалжийг харах

Canal monitoring(prometheus module) snapshot version.

Chuanyi Li 6 жил өмнө
parent
commit
c0a40499b0
21 өөрчлөгдсөн 876 нэмэгдсэн , 17 устгасан
  1. 8 0
      deployer/pom.xml
  2. 15 0
      deployer/src/main/bin/metrics_env.sh
  3. 6 1
      deployer/src/main/bin/startup.sh
  4. 2 1
      example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java
  5. 2 1
      pom.xml
  6. 67 0
      prometheus/pom.xml
  7. 98 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalInstanceExports.java
  8. 21 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalServerExports.java
  9. 15 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusProvider.java
  10. 110 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java
  11. 79 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/InboundThroughputAspect.java
  12. 64 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/InstanceMetaCollector.java
  13. 75 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MemoryStoreCollector.java
  14. 80 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/OutboundThroughputAspect.java
  15. 73 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java
  16. 11 0
      prometheus/src/main/resources/META-INF/aop.xml
  17. 1 0
      prometheus/src/main/resources/META-INF/services/com.alibaba.otter.canal.spi.CanalMetricsProvider
  18. 45 14
      server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java
  19. 24 0
      server/src/main/java/com/alibaba/otter/canal/spi/CanalMetricsProvider.java
  20. 42 0
      server/src/main/java/com/alibaba/otter/canal/spi/CanalMetricsService.java
  21. 38 0
      server/src/main/java/com/alibaba/otter/canal/spi/NopCanalMetricsService.java

+ 8 - 0
deployer/pom.xml

@@ -16,6 +16,14 @@
 			<artifactId>canal.server</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+
+		<!-- 这里指定runtime的metrics provider-->
+		<!--<dependency>-->
+			<!--<groupId>com.alibaba.otter</groupId>-->
+			<!--<artifactId>canal.prometheus</artifactId>-->
+			<!--<version>${project.version}</version>-->
+			<!--<scope>runtime</scope>-->
+		<!--</dependency>-->
 	</dependencies>
 	
 	<build>

+ 15 - 0
deployer/src/main/bin/metrics_env.sh

@@ -0,0 +1,15 @@
+#!/bin/bash
+# Additional line arg for current prometheus solution
+case "`uname`" in
+Linux)
+    bin_abs_path=$(readlink -f $(dirname $0))
+	;;
+*)
+	bin_abs_path=`cd $(dirname $0); pwd`
+	;;
+esac
+base=${bin_abs_path}/..
+if [ $(ls $base/lib/aspectjweaver*.jar | wc -l) -eq 1 ]; then
+    WEAVER=$(ls $base/lib/aspectjweaver*.jar)
+    METRICS_OPTS=" -javaagent:"${WEAVER}" "
+fi

+ 6 - 1
deployer/src/main/bin/startup.sh

@@ -94,7 +94,12 @@ then
 	echo LOG CONFIGURATION : $logback_configurationFile
 	echo canal conf : $canal_conf 
 	echo CLASSPATH :$CLASSPATH
-	$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher 1>>$base/logs/canal/canal.log 2>&1 &
+#   metrics support options
+#	if [ -x $base/bin/metrics_env.sh ]; then
+#	    . $base/bin/metrics_env.sh
+#	    echo METRICS_OPTS $METRICS_OPTS
+#	fi
+	$JAVA $JAVA_OPTS $METRICS_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher 1>>$base/logs/canal/canal.log 2>&1 &
 	echo $! > $base/bin/canal.pid 
 	
 	echo "cd to $current_path for continue"

+ 2 - 1
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.example;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.SystemUtils;
@@ -115,7 +116,7 @@ public class AbstractCanalClientTest {
                 connector.connect();
                 connector.subscribe();
                 while (running) {
-                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
+                    Message message = connector.getWithoutAck(batchSize, 1000L, TimeUnit.MILLISECONDS); // 获取指定数量的数据
                     long batchId = message.getId();
                     int size = message.getEntries().size();
                     if (batchId == -1 || size == 0) {

+ 2 - 1
pom.xml

@@ -129,6 +129,7 @@
         <module>example</module>
         <module>kafka</module>
         <module>kafka-client</module>
+        <module>prometheus</module>
     </modules>
 
     <dependencyManagement>
@@ -256,7 +257,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_520</version>
+                <version>2.0.0_preview_186</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>

+ 67 - 0
prometheus/pom.xml

@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>canal</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.0.26-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>canal.prometheus</artifactId>
+    <version>1.0.26-SNAPSHOT</version>
+    <name>canal prometheus module for otter ${project.version}</name>
+    <dependencies>
+        <!-- load time weaver-->
+        <dependency>
+            <groupId>org.aspectj</groupId>
+            <artifactId>aspectjrt</artifactId>
+            <version>1.8.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.aspectj</groupId>
+            <artifactId>aspectjweaver</artifactId>
+            <version>1.8.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.jctools</groupId>
+            <artifactId>jctools-core</artifactId>
+            <version>2.1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient</artifactId>
+            <version>0.4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_hotspot</artifactId>
+            <version>0.4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_httpserver</artifactId>
+            <version>0.4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_pushgateway</artifactId>
+            <version>0.4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.instance.core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.server</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+</project>

+ 98 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalInstanceExports.java

@@ -0,0 +1,98 @@
+package com.alibaba.otter.canal.prometheus;
+
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+import com.alibaba.otter.canal.prometheus.impl.InstanceMetaCollector;
+import com.alibaba.otter.canal.prometheus.impl.MemoryStoreCollector;
+import com.alibaba.otter.canal.prometheus.impl.PrometheusCanalEventDownStreamHandler;
+import com.alibaba.otter.canal.sink.CanalEventSink;
+import com.alibaba.otter.canal.sink.entry.EntryEventSink;
+import com.alibaba.otter.canal.store.CanalStoreException;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author Chuanyi Li
+ */
+public class CanalInstanceExports {
+
+    private static final Logger      logger         = LoggerFactory.getLogger(CanalInstanceExports.class);
+
+    public static final String[]     labels         = {"destination"};
+
+    public static final List<String> labelList      = Collections.singletonList(labels[0]);
+
+    private final String             destination;
+
+    private Collector                storeCollector;
+
+    private Collector                delayCollector;
+
+    private Collector                metaCollector;
+
+    private CanalInstanceExports(CanalInstance instance) {
+        this.destination = instance.getDestination();
+        initDelayGauge(instance);
+        initStoreCollector(instance);
+        initMetaCollector(instance);
+    }
+
+
+
+    static CanalInstanceExports forInstance(CanalInstance instance) {
+        return new CanalInstanceExports(instance);
+    }
+
+    void register() {
+        if (delayCollector != null) {
+            delayCollector.register();
+        }
+        if (storeCollector != null) {
+            storeCollector.register();
+        }
+        if (metaCollector != null) {
+            metaCollector.register();
+        }
+    }
+
+    void unregister() {
+        if (delayCollector != null) {
+            CollectorRegistry.defaultRegistry.unregister(delayCollector);
+        }
+        if (storeCollector != null) {
+            CollectorRegistry.defaultRegistry.unregister(storeCollector);
+        }
+        if (metaCollector != null) {
+            CollectorRegistry.defaultRegistry.unregister(metaCollector);
+        }
+    }
+
+    private void initDelayGauge(CanalInstance instance) {
+        CanalEventSink sink = instance.getEventSink();
+        if (sink instanceof EntryEventSink) {
+            EntryEventSink entryEventSink = (EntryEventSink) sink;
+            // TODO ensure not to add handler again
+            PrometheusCanalEventDownStreamHandler handler = new PrometheusCanalEventDownStreamHandler(destination);
+            entryEventSink.addHandler(handler);
+            delayCollector = handler.getCollector();
+        } else {
+            logger.warn("This impl register metrics for only EntryEventSink, skip.");
+        }
+    }
+
+    private void initStoreCollector(CanalInstance instance) {
+        try {
+            storeCollector = new MemoryStoreCollector(instance.getEventStore(), destination);
+        } catch (CanalStoreException cse) {
+            logger.warn("Failed to register metrics for destination {}.", destination, cse);
+        }
+    }
+
+    private void initMetaCollector(CanalInstance instance) {
+        metaCollector = new InstanceMetaCollector(instance);
+    }
+}

+ 21 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalServerExports.java

@@ -0,0 +1,21 @@
+package com.alibaba.otter.canal.prometheus;
+
+import com.alibaba.otter.canal.prometheus.impl.InboundThroughputAspect;
+import com.alibaba.otter.canal.prometheus.impl.OutboundThroughputAspect;
+
+/**
+ * @author Chuanyi Li
+ */
+public class CanalServerExports {
+
+    private static boolean initialized = false;
+
+    public static synchronized void initialize() {
+        if (!initialized) {
+            InboundThroughputAspect.getCollector().register();
+            OutboundThroughputAspect.getCollector().register();
+            initialized = true;
+        }
+    }
+
+}

+ 15 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusProvider.java

@@ -0,0 +1,15 @@
+package com.alibaba.otter.canal.prometheus;
+
+import com.alibaba.otter.canal.spi.CanalMetricsProvider;
+import com.alibaba.otter.canal.spi.CanalMetricsService;
+
+/**
+ * @author Chuanyi Li
+ */
+public class PrometheusProvider implements CanalMetricsProvider {
+
+    @Override
+    public CanalMetricsService getService() {
+        return PrometheusService.getInstance();
+    }
+}

+ 110 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java

@@ -0,0 +1,110 @@
+package com.alibaba.otter.canal.prometheus;
+
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+import com.alibaba.otter.canal.spi.CanalMetricsService;
+import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.DefaultExports;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Chuanyi Li
+ */
+public class PrometheusService implements CanalMetricsService {
+
+    private static final Logger                           logger  = LoggerFactory.getLogger(PrometheusService.class);
+
+    private final Map<String, CanalInstanceExports>       exports = new ConcurrentHashMap<String, CanalInstanceExports>();
+
+    private volatile boolean                              running = false;
+
+    private HTTPServer                                    server;
+
+    private PrometheusService() {
+    }
+
+    private static class SingletonHolder {
+        private static final PrometheusService SINGLETON = new PrometheusService();
+    }
+
+    public static PrometheusService getInstance() {
+        return SingletonHolder.SINGLETON;
+    }
+
+    @Override
+    public void initialize() {
+        try {
+            //TODO 1.Configurable port
+            //TODO 2.Https
+            server = new HTTPServer(11112);
+        } catch (IOException e) {
+            logger.warn("Unable to start prometheus HTTPServer.", e);
+            return;
+        }
+        try {
+            // JVM exports
+            DefaultExports.initialize();
+            // Canal server level exports
+            CanalServerExports.initialize();
+        } catch (Throwable t) {
+            logger.warn("Unable to initialize server exports.", t);
+        }
+
+        running = true;
+    }
+
+    @Override
+    public void terminate() {
+        running = false;
+        // Normally, service should be terminated at canal shutdown.
+        // No need to unregister instance exports explicitly.
+        // But for the sake of safety, unregister them.
+        for (CanalInstanceExports ie : exports.values()) {
+            ie.unregister();
+        }
+        if (server != null) {
+            server.stop();
+        }
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running;
+    }
+
+    @Override
+    public void register(CanalInstance instance) {
+        if (instance.isStart()) {
+            logger.warn("Cannot register metrics for destination {} that is running.", instance.getDestination());
+            return;
+        }
+        try {
+            CanalInstanceExports export = CanalInstanceExports.forInstance(instance);
+            export.register();
+            exports.put(instance.getDestination(), export);
+        } catch (Throwable t) {
+            logger.warn("Unable to register instance exports for {}.", instance.getDestination(), t);
+        }
+        logger.info("Register metrics for destination {}.", instance.getDestination());
+    }
+
+    @Override
+    public void unregister(CanalInstance instance) {
+        if (instance.isStart()) {
+            logger.warn("Try unregister metrics after destination {} is stopped.", instance.getDestination());
+        }
+        try {
+            CanalInstanceExports export = exports.remove(instance.getDestination());
+            if (export != null) {
+                export.unregister();
+            }
+        } catch (Throwable t) {
+            logger.warn("Unable to unregister instance exports for {}.", instance.getDestination(), t);
+        }
+        logger.info("Unregister metrics for destination {}.", instance.getDestination());
+    }
+}

+ 79 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/InboundThroughputAspect.java

@@ -0,0 +1,79 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+import org.aspectj.lang.annotation.After;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import org.jctools.maps.ConcurrentAutoTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Chuanyi Li
+ */
+@Aspect
+public class InboundThroughputAspect {
+
+    private static final Logger              logger    = LoggerFactory.getLogger(InboundThroughputAspect.class);
+
+    /**
+     *  Support highly scalable counters
+     *  @see ConcurrentAutoTable
+     */
+    private static final ConcurrentAutoTable total     = new ConcurrentAutoTable();
+
+    private static final Collector           collector = new InboundThroughputCollector();
+
+    public static Collector getCollector() {
+        return collector;
+    }
+
+    @Pointcut("call(byte[] com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel.read(..))")
+    public void read() {}
+
+    @Pointcut("call(void com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel.read(..)) ")
+    public void readBytes() {}
+
+    //nested read, just eliminate them.
+    @Pointcut("withincode(* com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel.read(..))")
+    public void nestedCall() {}
+
+    @After("read() && !nestedCall()  && args(len, ..)")
+    public void recordRead(int len) {
+        accumulateBytes(len);
+    }
+
+    @After("readBytes() && !nestedCall() && args(.., len, timeout)")
+    public void recordReadBytes(int len, int timeout) {
+        accumulateBytes(len);
+    }
+
+    private void accumulateBytes(int count) {
+        try {
+            total.add(count);
+        } catch (Throwable t) {
+            //Catch every Throwable, rather than break the business logic.
+            logger.warn("Error while accumulate inbound bytes.", t);
+        }
+    }
+
+    public static class InboundThroughputCollector extends Collector {
+
+        private InboundThroughputCollector() {}
+
+        @Override
+        public List<MetricFamilySamples> collect() {
+            List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
+            CounterMetricFamily bytes = new CounterMetricFamily("canal_net_inbound_bytes",
+                    "Total socket inbound bytes of canal server.",
+                    total.get());
+            mfs.add(bytes);
+            return mfs;
+        }
+    }
+
+}

+ 64 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/InstanceMetaCollector.java

@@ -0,0 +1,64 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+import com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring;
+import com.alibaba.otter.canal.meta.CanalMetaManager;
+import com.alibaba.otter.canal.prometheus.CanalInstanceExports;
+import com.alibaba.otter.canal.protocol.ClientIdentity;
+import io.prometheus.client.Collector;
+import io.prometheus.client.GaugeMetricFamily;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * @author Chuanyi Li
+ */
+public class InstanceMetaCollector extends Collector {
+
+    private static final List<String> InfoLabel    = Arrays.asList("destination", "mode");
+
+    private CanalMetaManager          metaManager;
+
+    private final String              destination;
+
+    private final String              mode;
+
+    private final String              subsHelp;
+
+    public InstanceMetaCollector(CanalInstance instance) {
+        if (instance == null) {
+            throw new IllegalArgumentException("CanalInstance must not be null.");
+        }
+        if (instance instanceof CanalInstanceWithSpring) {
+            mode = "spring";
+        } else {
+            mode = "manager";
+        }
+        this.metaManager = instance.getMetaManager();
+        this.destination = instance.getDestination();
+        this.subsHelp = "Subscriptions of canal instance " + destination;
+    }
+
+    @Override
+    public List<MetricFamilySamples> collect() {
+        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
+        GaugeMetricFamily instanceInfo = new GaugeMetricFamily(
+                "canal_instance",
+                "Canal instance",
+                InfoLabel);
+        instanceInfo.addMetric(Arrays.asList(destination, mode), 1);
+        mfs.add(instanceInfo);
+        if (metaManager.isStart()) {
+            // client id = hardcode 1001, 目前没有意义
+            List<ClientIdentity> subs = metaManager.listAllSubscribeInfo(destination);
+            GaugeMetricFamily subscriptions = new GaugeMetricFamily(
+                    "canal_instance_subscription",
+                    subsHelp, CanalInstanceExports.labelList);
+            subscriptions.addMetric(Arrays.asList(destination), subs.size());
+            mfs.add(subscriptions);
+        }
+        return mfs;
+    }
+}

+ 75 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MemoryStoreCollector.java

@@ -0,0 +1,75 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import com.alibaba.otter.canal.prometheus.CanalInstanceExports;
+import com.alibaba.otter.canal.store.CanalEventStore;
+import com.alibaba.otter.canal.store.CanalStoreException;
+import com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author Chuanyi Li
+ */
+public class MemoryStoreCollector extends Collector {
+
+    private static final Class<MemoryEventStoreWithBuffer> clazz  = MemoryEventStoreWithBuffer.class;
+
+    private final String                                   destination;
+
+    private final AtomicLong                               putSequence;
+
+    private final AtomicLong                               ackSequence;
+
+    private final String                                   putHelp;
+
+    private final String                                   ackHelp;
+
+    public MemoryStoreCollector(CanalEventStore store, String destination) {
+        this.destination = destination;
+        if (!(store instanceof MemoryEventStoreWithBuffer)) {
+            throw new IllegalArgumentException("EventStore must be MemoryEventStoreWithBuffer");
+        }
+        MemoryEventStoreWithBuffer ms = (MemoryEventStoreWithBuffer) store;
+        putSequence = getDeclaredValue(ms, "putSequence");
+        ackSequence = getDeclaredValue(ms, "ackSequence");
+        putHelp = "Produced sequence of canal instance " + destination;
+        ackHelp = "Consumed sequence of canal instance " + destination;
+    }
+
+    @Override
+    public List<MetricFamilySamples> collect() {
+        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
+        CounterMetricFamily put = new CounterMetricFamily("canal_instance_store_produce_seq",
+                putHelp, Arrays.asList(CanalInstanceExports.labels));
+        put.addMetric(Collections.singletonList(destination), putSequence.doubleValue());
+        mfs.add(put);
+        CounterMetricFamily ack = new CounterMetricFamily("canal_instance_store_consume_seq",
+                ackHelp, Arrays.asList(CanalInstanceExports.labels));
+        ack.addMetric(Collections.singletonList(destination), ackSequence.doubleValue());
+        mfs.add(ack);
+        return mfs;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> T getDeclaredValue(MemoryEventStoreWithBuffer store, String name) {
+        T value;
+        try {
+            Field putField = clazz.getDeclaredField(name);
+            putField.setAccessible(true);
+            value = (T) putField.get(store);
+        } catch (NoSuchFieldException e) {
+            throw new CanalStoreException(e);
+        } catch (IllegalAccessException e) {
+            throw new CanalStoreException(e);
+        }
+        return value;
+    }
+
+}

+ 80 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/OutboundThroughputAspect.java

@@ -0,0 +1,80 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+import org.aspectj.lang.annotation.After;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import org.jboss.netty.channel.Channel;
+import org.jctools.maps.ConcurrentAutoTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.alibaba.otter.canal.server.netty.NettyUtils.HEADER_LENGTH;
+
+/**
+ * @author Chuanyi Li
+ */
+@Aspect
+public class OutboundThroughputAspect {
+    private static final Logger              logger    = LoggerFactory.getLogger(OutboundThroughputAspect.class);
+
+    /**
+     *  Support highly scalable counters
+     *  @see ConcurrentAutoTable
+     */
+    private static final ConcurrentAutoTable total     = new ConcurrentAutoTable();
+
+    private static final Collector           collector = new OutboundThroughputCollector();
+
+    public static Collector getCollector() {
+        return collector;
+    }
+
+    @Pointcut("call(* com.alibaba.otter.canal.server.netty.NettyUtils.write(..))")
+    public void write() {}
+
+    //nested read, just eliminate them.
+    @Pointcut("withincode(* com.alibaba.otter.canal.server.netty.NettyUtils.write(..))")
+    public void nestedCall() {}
+
+    @After("write() && !nestedCall() && args(ch, bytes, ..)")
+    public void recordWriteBytes(Channel ch, byte[] bytes) {
+        if (bytes != null) {
+            accumulateBytes(HEADER_LENGTH + bytes.length);
+        }
+    }
+
+    @After("write() && !nestedCall() && args(ch, buf, ..)")
+    public void recordWriteBuffer(Channel ch, ByteBuffer buf) {
+        if (buf != null) {
+            total.add(HEADER_LENGTH + buf.limit());
+        }
+    }
+    private void accumulateBytes(int count) {
+        try {
+            total.add(count);
+        } catch (Throwable t) {
+            //Catch every Throwable, rather than break the business logic.
+            logger.warn("Error while accumulate inbound bytes.", t);
+        }
+    }
+
+    public static class OutboundThroughputCollector extends Collector {
+
+        private OutboundThroughputCollector() {}
+
+        @Override public List<MetricFamilySamples> collect() {
+            List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
+            CounterMetricFamily bytes = new CounterMetricFamily("canal_net_outbound_bytes",
+                    "Total socket outbound bytes of canal server.",
+                    total.get());
+            mfs.add(bytes);
+            return mfs;
+        }
+    }
+}

+ 73 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java

@@ -0,0 +1,73 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import com.alibaba.otter.canal.prometheus.CanalInstanceExports;
+import com.alibaba.otter.canal.sink.AbstractCanalEventDownStreamHandler;
+import com.alibaba.otter.canal.store.model.Event;
+import io.prometheus.client.Collector;
+import io.prometheus.client.GaugeMetricFamily;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author Chuanyi Li
+ */
+public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDownStreamHandler<List<Event>> {
+
+    private final Collector     collector;
+
+    private long                latestExecuteTime = 0L;
+
+    private static final String DELAY_NAME        = "canal_instance_traffic_delay";
+
+    private final String        delayHelpName;
+
+    private final List<String>  labelValues;
+
+    public PrometheusCanalEventDownStreamHandler(final String destination) {
+        this.delayHelpName = "Traffic delay of canal instance " + destination + " in seconds.";
+        this.labelValues = Collections.singletonList(destination);
+        collector = new Collector() {
+            @Override
+            public List<MetricFamilySamples> collect() {
+                List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
+                long now = System.currentTimeMillis();
+                GaugeMetricFamily delay = new GaugeMetricFamily(
+                        DELAY_NAME,
+                        delayHelpName,
+                        CanalInstanceExports.labelList);
+                double d = 0.0;
+                if (latestExecuteTime > 0) {
+                    d = now - latestExecuteTime;
+                }
+                d = d > 0.0 ? (d / 1000) : 0.0;
+                delay.addMetric(labelValues, d);
+                mfs.add(delay);
+                return mfs;
+            }
+        };
+    }
+
+    @Override
+    public List<Event> before(List<Event> events) {
+        // TODO utilize MySQL master heartbeat packet to refresh delay if always no more events coming
+        // see: https://dev.mysql.com/worklog/task/?id=342
+        // heartbeats are sent by the master only if there is no
+        // more unsent events in the actual binlog file for a period longer that
+        // master_heartbeat_period.
+        if (events != null && !events.isEmpty()) {
+            Event last = events.get(events.size() - 1);
+            long ts = last.getExecuteTime();
+            if (ts > latestExecuteTime) {
+                latestExecuteTime = ts;
+            }
+        }
+        return events;
+    }
+
+    public Collector getCollector() {
+        return this.collector;
+    }
+
+}

+ 11 - 0
prometheus/src/main/resources/META-INF/aop.xml

@@ -0,0 +1,11 @@
+<aspectj>
+
+    <aspects>
+        <aspect name="com.alibaba.otter.canal.prometheus.impl.InboundThroughputAspect"/>
+        <aspect name="com.alibaba.otter.canal.prometheus.impl.OutboundThroughputAspect"/>
+    </aspects>
+    <weaver options="-verbose -showWeaveInfo">
+        <include within="com.alibaba.otter.canal..*"/>
+    </weaver>
+
+</aspectj>

+ 1 - 0
prometheus/src/main/resources/META-INF/services/com.alibaba.otter.canal.spi.CanalMetricsProvider

@@ -0,0 +1 @@
+com.alibaba.otter.canal.prometheus.PrometheusProvider

+ 45 - 14
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -1,11 +1,11 @@
 package com.alibaba.otter.canal.server.embedded;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
+import com.alibaba.otter.canal.spi.CanalMetricsProvider;
+import com.alibaba.otter.canal.spi.CanalMetricsService;
+import com.alibaba.otter.canal.spi.NopCanalMetricsService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -33,17 +33,18 @@ import com.google.protobuf.ByteString;
 
 /**
  * 嵌入式版本实现
- * 
+ *
  * @author jianghang 2012-7-12 下午01:34:00
  * @author zebin.xuzb
  * @version 1.0.0
  */
 public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {
 
-    private static final Logger        logger = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
+    private static final Logger        logger           = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
     private Map<String, CanalInstance> canalInstances;
     // private Map<ClientIdentity, Position> lastRollbackPostions;
     private CanalInstanceGenerator     canalInstanceGenerator;
+    private CanalMetricsService        metrics          = NopCanalMetricsService.NOP;
 
     private static class SingletonHolder {
 
@@ -61,7 +62,9 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
     public void start() {
         if (!isStart()) {
             super.start();
-
+            // 如果存在provider,则启动metrics service
+            loadCanalMetrics();
+            metrics.initialize();
             canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {
 
                 public CanalInstance apply(String destination) {
@@ -92,6 +95,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                 logger.error(String.format("stop CanalInstance[%s] has an error", entry.getKey()), e);
             }
         }
+        metrics.terminate();
     }
 
     public void start(final String destination) {
@@ -99,6 +103,9 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
         if (!canalInstance.isStart()) {
             try {
                 MDC.put("destination", destination);
+                if (metrics.isRunning()) {
+                    metrics.register(canalInstance);
+                }
                 canalInstance.start();
                 logger.info("start CanalInstances[{}] successfully", destination);
             } finally {
@@ -114,6 +121,9 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                 try {
                     MDC.put("destination", destination);
                     canalInstance.stop();
+                    if (metrics.isRunning()) {
+                        metrics.unregister(canalInstance);
+                    }
                     logger.info("stop CanalInstances[{}] successfully", destination);
                 } finally {
                     MDC.remove("destination");
@@ -176,7 +186,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
 
     /**
      * 获取数据
-     * 
+     *
      * <pre>
      * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
      * </pre>
@@ -188,14 +198,14 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
 
     /**
      * 获取数据,可以指定超时时间.
-     * 
+     *
      * <pre>
      * 几种case:
      * a. 如果timeout为null,则采用tryGet方式,即时获取
      * b. 如果timeout不为null
      *    1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回
      *    2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少
-     * 
+     *
      * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
      * </pre>
      */
@@ -251,7 +261,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
     /**
      * 不指定 position 获取事件。canal 会记住此 client 最新的 position。 <br/>
      * 如果是第一次 fetch,则会从 canal 中保存的最老一条数据开始输出。
-     * 
+     *
      * <pre>
      * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
      * </pre>
@@ -264,14 +274,14 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
     /**
      * 不指定 position 获取事件。canal 会记住此 client 最新的 position。 <br/>
      * 如果是第一次 fetch,则会从 canal 中保存的最老一条数据开始输出。
-     * 
+     *
      * <pre>
      * 几种case:
      * a. 如果timeout为null,则采用tryGet方式,即时获取
      * b. 如果timeout不为null
      *    1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回
      *    2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少
-     *    
+     *
      * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
      * </pre>
      */
@@ -342,7 +352,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
 
     /**
      * 进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
-     * 
+     *
      * <pre>
      * 注意:进行反馈时必须按照batchId的顺序进行ack(需有客户端保证)
      * </pre>
@@ -492,6 +502,27 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
         }
     }
 
+    private void loadCanalMetrics() {
+        ServiceLoader<CanalMetricsProvider> providers = ServiceLoader.load(CanalMetricsProvider.class);
+        List<CanalMetricsProvider> list = new ArrayList<CanalMetricsProvider>();
+        for (CanalMetricsProvider provider : providers) {
+            list.add(provider);
+        }
+        if (!list.isEmpty()) {
+            // 发现provider, 进行初始化
+            if (list.size() > 1) {
+                logger.warn("Found more than one CanalMetricsProvider, use the first one.");
+                //报告冲突
+                for (CanalMetricsProvider p : list) {
+                    logger.warn("Found CanalMetricsProvider: {}.", p.getClass().getName());
+                }
+            }
+            //默认使用第一个
+            CanalMetricsProvider provider = list.get(0);
+            this.metrics = provider.getService();
+        }
+    }
+
     // ========= setter ==========
 
     public void setCanalInstanceGenerator(CanalInstanceGenerator canalInstanceGenerator) {

+ 24 - 0
server/src/main/java/com/alibaba/otter/canal/spi/CanalMetricsProvider.java

@@ -0,0 +1,24 @@
+package com.alibaba.otter.canal.spi;
+
+/**
+ * Use java service provider mechanism to provide {@link CanalMetricsService}.
+ * <pre>
+ * Example:
+ * {@code
+ *     ServiceLoader<CanalMetricsProvider> providers = ServiceLoader.load(CanalMetricsProvider.class);
+ *     List<CanalMetricsProvider> list = new ArrayList<CanalMetricsProvider>();
+ *     for (CanalMetricsProvider provider : providers) {
+ *         list.add(provider);
+ *     }
+ * }
+ * </pre>
+ * @author Chuanyi Li
+ */
+public interface CanalMetricsProvider {
+
+    /**
+     * @return Impl of {@link CanalMetricsService}
+     */
+    CanalMetricsService getService();
+
+}

+ 42 - 0
server/src/main/java/com/alibaba/otter/canal/spi/CanalMetricsService.java

@@ -0,0 +1,42 @@
+package com.alibaba.otter.canal.spi;
+
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+
+/**
+ * Canal server/instance metrics for export.
+ * <strong>
+ *     Designed to be created by service provider.
+ * </strong>
+ * @see CanalMetricsProvider
+ * @author Chuanyi Li
+ */
+public interface CanalMetricsService {
+
+    /**
+     * Initialization on canal server startup.
+     */
+    void initialize();
+
+    /**
+     * Clean-up at canal server stop phase.
+     */
+    void terminate();
+
+    /**
+     * @return {@code true} if the metrics service is running, otherwise {@code false}.
+     */
+    boolean isRunning();
+
+    /**
+     * Register instance level metrics for specified instance.
+     * @param instance {@link CanalInstance}
+     */
+    void register(CanalInstance instance);
+
+    /**
+     * Unregister instance level metrics for specified instance.
+     * @param instance {@link CanalInstance}
+     */
+    void unregister(CanalInstance instance);
+
+}

+ 38 - 0
server/src/main/java/com/alibaba/otter/canal/spi/NopCanalMetricsService.java

@@ -0,0 +1,38 @@
+package com.alibaba.otter.canal.spi;
+
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+
+/**
+ * @author Chuanyi Li
+ */
+public class NopCanalMetricsService implements CanalMetricsService {
+
+    public static final NopCanalMetricsService NOP = new NopCanalMetricsService();
+
+    private NopCanalMetricsService() {}
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void terminate() {
+
+    }
+
+    @Override
+    public boolean isRunning() {
+        return false;
+    }
+
+    @Override
+    public void register(CanalInstance instance) {
+
+    }
+
+    @Override
+    public void unregister(CanalInstance instance) {
+
+    }
+}