Browse Source

canal server metrics refactor.

Chuanyi Li 6 years ago
parent
commit
8b186d1330
25 changed files with 535 additions and 268 deletions
  1. 31 0
      common/src/main/java/com/alibaba/otter/canal/common/utils/SerializedLongAdder.java
  2. 3 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  3. 26 18
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  4. 28 10
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  5. 13 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  6. 29 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java
  7. 35 14
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalInstanceExports.java
  8. 0 4
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalServerExports.java
  9. 7 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java
  10. 45 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/EntrySinkCollector.java
  11. 0 79
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/InboundThroughputAspect.java
  12. 1 1
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/InstanceMetaCollector.java
  13. 2 7
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MemoryStoreCollector.java
  14. 74 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MysqlParserCollector.java
  15. 0 80
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/OutboundThroughputAspect.java
  16. 1 1
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java
  17. 134 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfiler.java
  18. 15 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfilerFactory.java
  19. 0 11
      prometheus/src/main/resources/META-INF/aop.xml
  20. 33 17
      server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNettyProfiler.java
  21. 2 1
      server/src/main/java/com/alibaba/otter/canal/server/netty/ClientInstanceProfiler.java
  22. 1 1
      server/src/main/java/com/alibaba/otter/canal/server/netty/ClientInstanceProfilerFactory.java
  23. 1 0
      server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java
  24. 42 20
      server/src/main/java/com/alibaba/otter/canal/server/netty/listener/ChannelFutureAggregator.java
  25. 12 1
      sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

+ 31 - 0
common/src/main/java/com/alibaba/otter/canal/common/utils/SerializedLongAdder.java

@@ -0,0 +1,31 @@
+package com.alibaba.otter.canal.common.utils;
+
+/**
+ * 用于dump记录counter
+ * 这种场景下,
+ * <strong>写操作>>读操作</strong>
+ * dump线程为串行,读取counter时,并不严格要求最新值可见,使用普通long即可。
+ *
+ * @author Chuanyi Li
+ */
+public class SerializedLongAdder {
+
+    private long value = 0;
+
+    public SerializedLongAdder(long initial) {
+        this.value = initial;
+    }
+
+    public void add(long x) {
+        value += x;
+    }
+
+    public void increment() {
+        value ++;
+    }
+
+    public long get() {
+        return value;
+    }
+
+}

+ 3 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -8,6 +8,7 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.alibaba.otter.canal.common.utils.SerializedLongAdder;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.math.RandomUtils;
@@ -95,6 +96,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     protected int                                    parallelBufferSize         = 256;                                     // 必须为2的幂
     protected MultiStageCoprocessor                  multiStageCoprocessor;
 
+
     protected abstract BinlogParser buildParser();
 
     protected abstract ErosaConnection buildErosaConnection();
@@ -615,4 +617,5 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         this.parallelBufferSize = parallelBufferSize;
     }
 
+
 }

+ 26 - 18
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 
 import java.nio.charset.Charset;
 
+import com.alibaba.otter.canal.common.utils.SerializedLongAdder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -19,21 +20,22 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
 
 public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
-    protected final Logger      logger                  = LoggerFactory.getLogger(this.getClass());
-    protected static final long BINLOG_START_OFFEST     = 4L;
+    protected final Logger            logger                    = LoggerFactory.getLogger(this.getClass());
+    protected static final long       BINLOG_START_OFFEST       = 4L;
 
-    protected boolean           enableTsdb              = false;
-    protected String            tsdbSpringXml;
-    protected TableMetaTSDB     tableMetaTSDB;
+    protected boolean                 enableTsdb                = false;
+    protected String                  tsdbSpringXml;
+    protected TableMetaTSDB           tableMetaTSDB;
     // 编码信息
-    protected byte              connectionCharsetNumber = (byte) 33;
-    protected Charset           connectionCharset       = Charset.forName("UTF-8");
-    protected boolean           filterQueryDcl          = false;
-    protected boolean           filterQueryDml          = false;
-    protected boolean           filterQueryDdl          = false;
-    protected boolean           filterRows              = false;
-    protected boolean           filterTableError        = false;
-    protected boolean           useDruidDdlFilter       = true;
+    protected byte                    connectionCharsetNumber   = (byte) 33;
+    protected Charset                 connectionCharset         = Charset.forName("UTF-8");
+    protected boolean                 filterQueryDcl            = false;
+    protected boolean                 filterQueryDml            = false;
+    protected boolean                 filterQueryDdl            = false;
+    protected boolean                 filterRows                = false;
+    protected boolean                 filterTableError          = false;
+    protected boolean                 useDruidDdlFilter         = true;
+    private final SerializedLongAdder eventsPublishBlockingTime = new SerializedLongAdder(0L);
 
     protected BinlogParser buildParser() {
         LogEventConvert convert = new LogEventConvert();
@@ -119,11 +121,13 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     }
 
     protected MultiStageCoprocessor buildMultiStageCoprocessor() {
-        return new MysqlMultiStageCoprocessor(parallelBufferSize,
-            parallelThreadSize,
-            (LogEventConvert) binlogParser,
-            transactionBuffer,
-            destination);
+        MysqlMultiStageCoprocessor mysqlMultiStageCoprocessor = new MysqlMultiStageCoprocessor(parallelBufferSize,
+                parallelThreadSize,
+                (LogEventConvert) binlogParser,
+                transactionBuffer,
+                destination);
+        mysqlMultiStageCoprocessor.setEventsPublishBlockingTime(eventsPublishBlockingTime);
+        return mysqlMultiStageCoprocessor;
     }
 
     // ============================ setter / getter =========================
@@ -188,4 +192,8 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         }
     }
 
+    public SerializedLongAdder getEventsPublishBlockingTime() {
+        return this.eventsPublishBlockingTime;
+    }
+
 }

+ 28 - 10
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -8,6 +8,7 @@ import java.nio.charset.Charset;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.alibaba.otter.canal.common.utils.SerializedLongAdder;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,18 +38,19 @@ import com.taobao.tddl.dbsync.binlog.LogEvent;
 
 public class MysqlConnection implements ErosaConnection {
 
-    private static final Logger logger      = LoggerFactory.getLogger(MysqlConnection.class);
+    private static final Logger       logger                    = LoggerFactory.getLogger(MysqlConnection.class);
 
-    private MysqlConnector      connector;
-    private long                slaveId;
-    private Charset             charset     = Charset.forName("UTF-8");
-    private BinlogFormat        binlogFormat;
-    private BinlogImage         binlogImage;
+    private MysqlConnector            connector;
+    private long                      slaveId;
+    private Charset                   charset                   = Charset.forName("UTF-8");
+    private BinlogFormat              binlogFormat;
+    private BinlogImage               binlogImage;
 
     // tsdb releated
-    private AuthenticationInfo  authInfo;
-    protected int               connTimeout = 5 * 1000;                                      // 5秒
-    protected int               soTimeout   = 60 * 60 * 1000;                                // 1小时
+    private AuthenticationInfo        authInfo;
+    protected     int                 connTimeout               = 5 * 1000;                                      // 5秒
+    protected     int                 soTimeout                 = 60 * 60 * 1000;                                // 1小时
+    private       SerializedLongAdder receivedBinlogBytes;
 
     public MysqlConnection(){
     }
@@ -124,6 +126,7 @@ public class MysqlConnection implements ErosaConnection {
         decoder.handle(LogEvent.XID_EVENT);
         LogContext context = new LogContext();
         while (fetcher.fetch()) {
+            accumulateReceivedBytes(fetcher.limit());
             LogEvent event = null;
             event = decoder.decode(fetcher, context);
 
@@ -146,6 +149,7 @@ public class MysqlConnection implements ErosaConnection {
         LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
         LogContext context = new LogContext();
         while (fetcher.fetch()) {
+            accumulateReceivedBytes(fetcher.limit());
             LogEvent event = null;
             event = decoder.decode(fetcher, context);
 
@@ -174,6 +178,7 @@ public class MysqlConnection implements ErosaConnection {
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
             while (fetcher.fetch()) {
+                accumulateReceivedBytes(fetcher.limit());
                 LogEvent event = null;
                 event = decoder.decode(fetcher, context);
 
@@ -204,6 +209,7 @@ public class MysqlConnection implements ErosaConnection {
         try {
             fetcher.start(connector.getChannel());
             while (fetcher.fetch()) {
+                accumulateReceivedBytes(fetcher.limit());
                 LogBuffer buffer = fetcher.duplicate();
                 fetcher.consume(fetcher.limit());
                 if (!coprocessor.publish(buffer)) {
@@ -230,6 +236,7 @@ public class MysqlConnection implements ErosaConnection {
         try {
             fetcher.start(connector.getChannel());
             while (fetcher.fetch()) {
+                accumulateReceivedBytes(fetcher.limit());
                 LogBuffer buffer = fetcher.duplicate();
                 fetcher.consume(fetcher.limit());
                 if (!coprocessor.publish(buffer)) {
@@ -334,7 +341,6 @@ public class MysqlConnection implements ErosaConnection {
      * <li>net_read_timeout</li>
      * </ol>
      * 
-     * @param channel
      * @throws IOException
      */
     private void updateSettings() throws IOException {
@@ -453,6 +459,14 @@ public class MysqlConnection implements ErosaConnection {
         }
     }
 
+    private void accumulateReceivedBytes(long x) {
+        if (receivedBinlogBytes != null) {
+            receivedBinlogBytes.add(x);
+        }
+    }
+
+
+
     public static enum BinlogFormat {
 
         STATEMENT("STATEMENT"), ROW("ROW"), MIXED("MIXED");
@@ -592,4 +606,8 @@ public class MysqlConnection implements ErosaConnection {
         this.authInfo = authInfo;
     }
 
+    public void setReceivedBinlogBytes(SerializedLongAdder receivedBinlogBytes) {
+        this.receivedBinlogBytes = receivedBinlogBytes;
+    }
+
 }

+ 13 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -10,6 +10,7 @@ import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.alibaba.otter.canal.common.utils.SerializedLongAdder;
 import org.apache.commons.lang.StringUtils;
 import org.springframework.util.CollectionUtils;
 
@@ -69,8 +70,11 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private BinlogImage[]      supportBinlogImages;                          // 支持的binlogImage,如果设置会执行强校验
 
     // update by yishun.chen,特殊异常处理参数
-    private int                dumpErrorCount                    = 0;        // binlogDump失败异常计数
-    private int                dumpErrorCountThreshold           = 2;        // binlogDump失败异常计数阀值
+    private       int                 dumpErrorCount            = 0;        // binlogDump失败异常计数
+    private       int                 dumpErrorCountThreshold   = 2;        // binlogDump失败异常计数阀值
+
+    // instance received binlog bytes
+    private final SerializedLongAdder receivedBinlogBytes       = new SerializedLongAdder(0L);
 
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
@@ -314,6 +318,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         connection.getConnector().setSendBufferSize(sendBufferSize);
         connection.getConnector().setSoTimeout(defaultConnectionTimeoutInSeconds * 1000);
         connection.setCharset(connectionCharset);
+        connection.setReceivedBinlogBytes(receivedBinlogBytes);
         // 随机生成slaveId
         if (this.slaveId <= 0) {
             this.slaveId = generateUniqueServerId();
@@ -908,4 +913,10 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         this.dumpErrorCountThreshold = dumpErrorCountThreshold;
     }
 
+
+
+    public SerializedLongAdder getReceivedBinlogBytes() {
+        return this.receivedBinlogBytes;
+    }
+
 }

+ 29 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -4,6 +4,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.locks.LockSupport;
 
+import com.alibaba.otter.canal.common.utils.SerializedLongAdder;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
@@ -52,6 +53,7 @@ import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
  */
 public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implements MultiStageCoprocessor {
 
+    private static final int             maxFullTimes               = 10;
     private LogEventConvert              logEventConvert;
     private EventTransactionBuffer       transactionBuffer;
     private ErosaConnection              connection;
@@ -63,6 +65,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     private ExecutorService              stageExecutor;
     private String                       destination;
     private volatile CanalParseException exception;
+    private SerializedLongAdder          eventsPublishBlockingTime;
 
     public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
                                       EventTransactionBuffer transactionBuffer, String destination){
@@ -161,6 +164,8 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
             throw exception;
         }
         boolean interupted = false;
+        long blockingStart = 0L;
+        int fullTimes = 0;
         do {
             try {
                 long next = disruptorMsgBuffer.tryNext();
@@ -170,16 +175,34 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
                     event.setBinlogFileName(binlogFileName);
                 }
                 disruptorMsgBuffer.publish(next);
+                if (fullTimes > 0) {
+                    eventsPublishBlockingTime.add(System.nanoTime() - blockingStart);
+                }
                 break;
             } catch (InsufficientCapacityException e) {
+                if (fullTimes == 0) {
+                    blockingStart = System.nanoTime();
+                }
                 // park
-                LockSupport.parkNanos(1L);
+                //LockSupport.parkNanos(1L);
+                applyWait(++fullTimes);
                 interupted = Thread.interrupted();
             }
         } while (!interupted && isStart());
         return isStart();
     }
 
+    // 处理无数据的情况,避免空循环挂死
+    private void applyWait(int fullTimes) {
+        int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;
+        if (fullTimes <= 3) { // 3次以内
+            Thread.yield();
+        } else { // 超过3次,最多只sleep 1ms
+            LockSupport.parkNanos(100 * 1000L * newFullTimes);
+        }
+
+    }
+
     @Override
     public void reset() {
         if (isStart()) {
@@ -189,6 +212,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         start();
     }
 
+
     private class SimpleParserStage implements EventHandler<MessageEvent>, LifecycleAware {
 
         private LogDecoder decoder;
@@ -427,4 +451,8 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         this.connection = connection;
     }
 
+    public void setEventsPublishBlockingTime(SerializedLongAdder eventsPublishBlockingTime) {
+        this.eventsPublishBlockingTime = eventsPublishBlockingTime;
+    }
+
 }

+ 35 - 14
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalInstanceExports.java

@@ -1,9 +1,7 @@
 package com.alibaba.otter.canal.prometheus;
 
 import com.alibaba.otter.canal.instance.core.CanalInstance;
-import com.alibaba.otter.canal.prometheus.impl.InstanceMetaCollector;
-import com.alibaba.otter.canal.prometheus.impl.MemoryStoreCollector;
-import com.alibaba.otter.canal.prometheus.impl.PrometheusCanalEventDownStreamHandler;
+import com.alibaba.otter.canal.prometheus.impl.*;
 import com.alibaba.otter.canal.sink.CanalEventSink;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;
 import com.alibaba.otter.canal.store.CanalStoreException;
@@ -15,30 +13,31 @@ import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.List;
 
+import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.profiler;
+
 /**
  * @author Chuanyi Li
  */
 public class CanalInstanceExports {
 
-    private static final Logger      logger         = LoggerFactory.getLogger(CanalInstanceExports.class);
-
-    public static final String[]     labels         = {"destination"};
-
-    public static final List<String> labelList      = Collections.singletonList(labels[0]);
-
+    private static final Logger      logger          = LoggerFactory.getLogger(CanalInstanceExports.class);
+    public static final String       DESTINATION     = "destination";
+    public static final String[]     DEST_LABELS     = {DESTINATION};
+    public static final List<String> DEST_LABEL_LIST = Collections.singletonList(DESTINATION);
     private final String             destination;
-
     private Collector                storeCollector;
-
     private Collector                delayCollector;
-
     private Collector                metaCollector;
+    private Collector                sinkCollector;
+    private Collector                parserCollector;
 
     private CanalInstanceExports(CanalInstance instance) {
         this.destination = instance.getDestination();
-        initDelayGauge(instance);
+        initEventsMetrics(instance);
         initStoreCollector(instance);
         initMetaCollector(instance);
+        initSinkCollector(instance);
+        initParserCollector(instance);
     }
 
 
@@ -48,6 +47,7 @@ public class CanalInstanceExports {
     }
 
     void register() {
+        profiler().start(destination);
         if (delayCollector != null) {
             delayCollector.register();
         }
@@ -57,9 +57,16 @@ public class CanalInstanceExports {
         if (metaCollector != null) {
             metaCollector.register();
         }
+        if (sinkCollector != null) {
+            sinkCollector.register();
+        }
+        if (parserCollector != null) {
+            parserCollector.register();
+        }
     }
 
     void unregister() {
+        profiler().stop(destination);
         if (delayCollector != null) {
             CollectorRegistry.defaultRegistry.unregister(delayCollector);
         }
@@ -69,9 +76,15 @@ public class CanalInstanceExports {
         if (metaCollector != null) {
             CollectorRegistry.defaultRegistry.unregister(metaCollector);
         }
+        if (sinkCollector != null) {
+            CollectorRegistry.defaultRegistry.unregister(sinkCollector);
+        }
+        if (parserCollector != null) {
+            CollectorRegistry.defaultRegistry.unregister(parserCollector);
+        }
     }
 
-    private void initDelayGauge(CanalInstance instance) {
+    private void initEventsMetrics(CanalInstance instance) {
         CanalEventSink sink = instance.getEventSink();
         if (sink instanceof EntryEventSink) {
             EntryEventSink entryEventSink = (EntryEventSink) sink;
@@ -95,4 +108,12 @@ public class CanalInstanceExports {
     private void initMetaCollector(CanalInstance instance) {
         metaCollector = new InstanceMetaCollector(instance);
     }
+
+    private void initSinkCollector(CanalInstance instance) {
+        sinkCollector = new EntrySinkCollector(instance.getEventSink(), instance.getDestination());
+    }
+
+    private void initParserCollector(CanalInstance instance) {
+        parserCollector = new MysqlParserCollector(instance.getEventParser(), instance.getDestination());
+    }
 }

+ 0 - 4
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalServerExports.java

@@ -1,7 +1,5 @@
 package com.alibaba.otter.canal.prometheus;
 
-import com.alibaba.otter.canal.prometheus.impl.InboundThroughputAspect;
-import com.alibaba.otter.canal.prometheus.impl.OutboundThroughputAspect;
 
 /**
  * @author Chuanyi Li
@@ -12,8 +10,6 @@ public class CanalServerExports {
 
     public static synchronized void initialize() {
         if (!initialized) {
-            InboundThroughputAspect.getCollector().register();
-            OutboundThroughputAspect.getCollector().register();
             initialized = true;
         }
     }

+ 7 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java

@@ -1,6 +1,8 @@
 package com.alibaba.otter.canal.prometheus;
 
 import com.alibaba.otter.canal.instance.core.CanalInstance;
+import com.alibaba.otter.canal.prometheus.impl.PrometheusClientInstanceProfilerFactory;
+import com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler;
 import com.alibaba.otter.canal.spi.CanalMetricsService;
 import io.prometheus.client.exporter.HTTPServer;
 import io.prometheus.client.hotspot.DefaultExports;
@@ -11,6 +13,9 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.DISABLED;
+import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.profiler;
+
 /**
  * @author Chuanyi Li
  */
@@ -50,6 +55,7 @@ public class PrometheusService implements CanalMetricsService {
             DefaultExports.initialize();
             // Canal server level exports
             CanalServerExports.initialize();
+            profiler().setInstanceProfilerFactory(new PrometheusClientInstanceProfilerFactory());
         } catch (Throwable t) {
             logger.warn("Unable to initialize server exports.", t);
         }
@@ -66,6 +72,7 @@ public class PrometheusService implements CanalMetricsService {
         for (CanalInstanceExports ie : exports.values()) {
             ie.unregister();
         }
+        profiler().setInstanceProfilerFactory(DISABLED);
         if (server != null) {
             server.stop();
         }

+ 45 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/EntrySinkCollector.java

@@ -0,0 +1,45 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import com.alibaba.otter.canal.sink.CanalEventSink;
+import com.alibaba.otter.canal.sink.entry.EntryEventSink;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABEL_LIST;
+
+/**
+ * @author Chuanyi Li
+ */
+public class EntrySinkCollector extends Collector {
+
+    private static final String       SINK_BLOCKING_TIME      = "canal_instance_sink_blocking_time";
+    private final String              sinkBlockTimeHelp;
+    private final List<String>        destLabelValues;
+    private final AtomicLong          eventsSinkBlockingTime;
+
+
+    public EntrySinkCollector(CanalEventSink sink, String destination) {
+        if (!(sink instanceof EntryEventSink)) {
+            throw new IllegalArgumentException("CanalEventSink must be EntryEventSink");
+        }
+        this.destLabelValues = Collections.singletonList(destination);
+        EntryEventSink entrySink = (EntryEventSink) sink;
+        this.sinkBlockTimeHelp = "Sink blocking time of instance " + destination;
+        eventsSinkBlockingTime = entrySink.getEventsSinkBlockingTime();
+    }
+
+    @Override public List<MetricFamilySamples> collect() {
+        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
+        CounterMetricFamily blockingCounter = new CounterMetricFamily(SINK_BLOCKING_TIME,
+                sinkBlockTimeHelp, DEST_LABEL_LIST);
+        blockingCounter.addMetric(destLabelValues, eventsSinkBlockingTime.doubleValue());
+        mfs.add(blockingCounter);
+        return mfs;
+    }
+
+}

+ 0 - 79
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/InboundThroughputAspect.java

@@ -1,79 +0,0 @@
-package com.alibaba.otter.canal.prometheus.impl;
-
-import io.prometheus.client.Collector;
-import io.prometheus.client.CounterMetricFamily;
-import org.aspectj.lang.annotation.After;
-import org.aspectj.lang.annotation.Aspect;
-import org.aspectj.lang.annotation.Pointcut;
-import org.jctools.maps.ConcurrentAutoTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author Chuanyi Li
- */
-@Aspect
-public class InboundThroughputAspect {
-
-    private static final Logger              logger    = LoggerFactory.getLogger(InboundThroughputAspect.class);
-
-    /**
-     *  Support highly scalable counters
-     *  @see ConcurrentAutoTable
-     */
-    private static final ConcurrentAutoTable total     = new ConcurrentAutoTable();
-
-    private static final Collector           collector = new InboundThroughputCollector();
-
-    public static Collector getCollector() {
-        return collector;
-    }
-
-    @Pointcut("call(byte[] com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel.read(..))")
-    public void read() {}
-
-    @Pointcut("call(void com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel.read(..)) ")
-    public void readBytes() {}
-
-    //nested read, just eliminate them.
-    @Pointcut("withincode(* com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel.read(..))")
-    public void nestedCall() {}
-
-    @After("read() && !nestedCall()  && args(len, ..)")
-    public void recordRead(int len) {
-        accumulateBytes(len);
-    }
-
-    @After("readBytes() && !nestedCall() && args(.., len, timeout)")
-    public void recordReadBytes(int len, int timeout) {
-        accumulateBytes(len);
-    }
-
-    private void accumulateBytes(int count) {
-        try {
-            total.add(count);
-        } catch (Throwable t) {
-            //Catch every Throwable, rather than break the business logic.
-            logger.warn("Error while accumulate inbound bytes.", t);
-        }
-    }
-
-    public static class InboundThroughputCollector extends Collector {
-
-        private InboundThroughputCollector() {}
-
-        @Override
-        public List<MetricFamilySamples> collect() {
-            List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
-            CounterMetricFamily bytes = new CounterMetricFamily("canal_net_inbound_bytes",
-                    "Total socket inbound bytes of canal server.",
-                    total.get());
-            mfs.add(bytes);
-            return mfs;
-        }
-    }
-
-}

+ 1 - 1
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/InstanceMetaCollector.java

@@ -55,7 +55,7 @@ public class InstanceMetaCollector extends Collector {
             List<ClientIdentity> subs = metaManager.listAllSubscribeInfo(destination);
             GaugeMetricFamily subscriptions = new GaugeMetricFamily(
                     "canal_instance_subscription",
-                    subsHelp, CanalInstanceExports.labelList);
+                    subsHelp, CanalInstanceExports.DEST_LABEL_LIST);
             subscriptions.addMetric(Arrays.asList(destination), subs.size());
             mfs.add(subscriptions);
         }

+ 2 - 7
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MemoryStoreCollector.java

@@ -20,15 +20,10 @@ import java.util.concurrent.atomic.AtomicLong;
 public class MemoryStoreCollector extends Collector {
 
     private static final Class<MemoryEventStoreWithBuffer> clazz  = MemoryEventStoreWithBuffer.class;
-
     private final String                                   destination;
-
     private final AtomicLong                               putSequence;
-
     private final AtomicLong                               ackSequence;
-
     private final String                                   putHelp;
-
     private final String                                   ackHelp;
 
     public MemoryStoreCollector(CanalEventStore store, String destination) {
@@ -47,11 +42,11 @@ public class MemoryStoreCollector extends Collector {
     public List<MetricFamilySamples> collect() {
         List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
         CounterMetricFamily put = new CounterMetricFamily("canal_instance_store_produce_seq",
-                putHelp, Arrays.asList(CanalInstanceExports.labels));
+                putHelp, Arrays.asList(CanalInstanceExports.DEST_LABELS));
         put.addMetric(Collections.singletonList(destination), putSequence.doubleValue());
         mfs.add(put);
         CounterMetricFamily ack = new CounterMetricFamily("canal_instance_store_consume_seq",
-                ackHelp, Arrays.asList(CanalInstanceExports.labels));
+                ackHelp, Arrays.asList(CanalInstanceExports.DEST_LABELS));
         ack.addMetric(Collections.singletonList(destination), ackSequence.doubleValue());
         mfs.add(ack);
         return mfs;

+ 74 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MysqlParserCollector.java

@@ -0,0 +1,74 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import com.alibaba.otter.canal.common.utils.SerializedLongAdder;
+import com.alibaba.otter.canal.parse.CanalEventParser;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.GaugeMetricFamily;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DESTINATION;
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABEL_LIST;
+
+/**
+ * @author Chuanyi Li
+ */
+public class MysqlParserCollector extends Collector {
+
+    private static final String       PUBLISH_BLOCKING_TIME = "canal_instance_publish_blocking_time";
+    private static final String       RECEIVED_BINLOG_BYTES = "canal_instance_received_binlog_bytes";
+    private static final String       PARSER_MODE           = "canal_instance_parser_mode";
+    private static final String       MODE_LABEL            = "parallel";
+    private final List<String>        destLabelValues;
+    private final List<String>        modeLabelValues;
+    private final SerializedLongAdder receivedBinlogBytes;
+    private final SerializedLongAdder eventsPublishBlockingTime;
+    private final String              publishBlockingTimeHelp;
+    private final String              receivedBinlogBytesHelp;
+    private final String              modeHelp;
+    private final List<String>        modeLabels;
+    private final Boolean             isParallel;
+
+    public MysqlParserCollector(CanalEventParser parser, String destination) {
+        if (!(parser instanceof MysqlEventParser)) {
+            throw new IllegalArgumentException("CanalEventParser must be MysqlEventParser");
+        }
+        this.destLabelValues = Collections.singletonList(destination);
+        this.eventsPublishBlockingTime = ((MysqlEventParser)parser).getEventsPublishBlockingTime();
+        this.publishBlockingTimeHelp = "Blocking time of publishing of instance " + destination;
+        this.receivedBinlogBytes = ((MysqlEventParser)parser).getReceivedBinlogBytes();
+        this.receivedBinlogBytesHelp = "Received binlog bytes of instance" + destination;
+        this.isParallel = ((MysqlEventParser)parser).isParallel();
+        this.modeHelp = "Parser mode of instance" + destination;
+        this.modeLabels = Arrays.asList(DESTINATION, MODE_LABEL);
+        this.modeLabelValues = Arrays.asList(destination, isParallel.toString());
+    }
+
+    @Override public List<MetricFamilySamples> collect() {
+        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
+        // canal_instance_received_binlog_bytes
+        CounterMetricFamily bytesCounter = new CounterMetricFamily(RECEIVED_BINLOG_BYTES,
+                receivedBinlogBytesHelp, DEST_LABEL_LIST);
+        bytesCounter.addMetric(destLabelValues, receivedBinlogBytes.get());
+        mfs.add(bytesCounter);
+        // canal_instance_parser_mode
+        GaugeMetricFamily modeGauge = new GaugeMetricFamily(PARSER_MODE,
+                modeHelp,
+                modeLabels);
+        modeGauge.addMetric(modeLabelValues, 1);
+        mfs.add(modeGauge);
+        // canal_instance_publish_blocking_time
+        if (isParallel) {
+            CounterMetricFamily blockingCounter = new CounterMetricFamily(PUBLISH_BLOCKING_TIME,
+                    publishBlockingTimeHelp, DEST_LABEL_LIST);
+            blockingCounter.addMetric(destLabelValues, eventsPublishBlockingTime.get());
+            mfs.add(blockingCounter);
+        }
+        return mfs;
+    }
+}

+ 0 - 80
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/OutboundThroughputAspect.java

@@ -1,80 +0,0 @@
-package com.alibaba.otter.canal.prometheus.impl;
-
-import io.prometheus.client.Collector;
-import io.prometheus.client.CounterMetricFamily;
-import org.aspectj.lang.annotation.After;
-import org.aspectj.lang.annotation.Aspect;
-import org.aspectj.lang.annotation.Pointcut;
-import org.jboss.netty.channel.Channel;
-import org.jctools.maps.ConcurrentAutoTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import static com.alibaba.otter.canal.server.netty.NettyUtils.HEADER_LENGTH;
-
-/**
- * @author Chuanyi Li
- */
-@Aspect
-public class OutboundThroughputAspect {
-    private static final Logger              logger    = LoggerFactory.getLogger(OutboundThroughputAspect.class);
-
-    /**
-     *  Support highly scalable counters
-     *  @see ConcurrentAutoTable
-     */
-    private static final ConcurrentAutoTable total     = new ConcurrentAutoTable();
-
-    private static final Collector           collector = new OutboundThroughputCollector();
-
-    public static Collector getCollector() {
-        return collector;
-    }
-
-    @Pointcut("call(* com.alibaba.otter.canal.server.netty.NettyUtils.write(..))")
-    public void write() {}
-
-    //nested read, just eliminate them.
-    @Pointcut("withincode(* com.alibaba.otter.canal.server.netty.NettyUtils.write(..))")
-    public void nestedCall() {}
-
-    @After("write() && !nestedCall() && args(ch, bytes, ..)")
-    public void recordWriteBytes(Channel ch, byte[] bytes) {
-        if (bytes != null) {
-            accumulateBytes(HEADER_LENGTH + bytes.length);
-        }
-    }
-
-    @After("write() && !nestedCall() && args(ch, buf, ..)")
-    public void recordWriteBuffer(Channel ch, ByteBuffer buf) {
-        if (buf != null) {
-            total.add(HEADER_LENGTH + buf.limit());
-        }
-    }
-    private void accumulateBytes(int count) {
-        try {
-            total.add(count);
-        } catch (Throwable t) {
-            //Catch every Throwable, rather than break the business logic.
-            logger.warn("Error while accumulate inbound bytes.", t);
-        }
-    }
-
-    public static class OutboundThroughputCollector extends Collector {
-
-        private OutboundThroughputCollector() {}
-
-        @Override public List<MetricFamilySamples> collect() {
-            List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
-            CounterMetricFamily bytes = new CounterMetricFamily("canal_net_outbound_bytes",
-                    "Total socket outbound bytes of canal server.",
-                    total.get());
-            mfs.add(bytes);
-            return mfs;
-        }
-    }
-}

+ 1 - 1
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java

@@ -36,7 +36,7 @@ public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDow
                 GaugeMetricFamily delay = new GaugeMetricFamily(
                         DELAY_NAME,
                         delayHelpName,
-                        CanalInstanceExports.labelList);
+                        CanalInstanceExports.DEST_LABEL_LIST);
                 double d = 0.0;
                 if (latestExecuteTime > 0) {
                     d = now - latestExecuteTime;

+ 134 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfiler.java

@@ -0,0 +1,134 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import com.alibaba.otter.canal.protocol.CanalPacket.PacketType;
+import com.alibaba.otter.canal.server.netty.ClientInstanceProfiler;
+import com.alibaba.otter.canal.server.netty.listener.ChannelFutureAggregator.ClientRequestResult;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DESTINATION;
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS;
+
+/**
+ * @author Chuanyi Li
+ */
+public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler {
+
+    private static final String PACKET_TYPE    = "canal_instance_client_packets";
+    private static final String OUTBOUND_BYTES = "canal_instance_client_bytes";
+    private static final String EMPTY_BATCHES  = "canal_instance_client_empty_batches";
+    private static final String ERRORS         = "canal_instance_client_request_error";
+    private static final String LATENCY        = "canal_instance_client_request_latency";
+    private volatile boolean    running        = false;
+    private final String       destination;
+    private final String[]     destLabelValues;
+    private final Counter      outboundCounter;
+    private final Counter      packetsCounter;
+    private final Counter      emptyBatchesCounter;
+    private final Counter      errorsCounter;
+    private final Histogram    responseLatency;
+
+    public PrometheusClientInstanceProfiler(String destination) {
+        this.destination = destination;
+        this.destLabelValues = new String[]{destination};
+        this.outboundCounter = Counter.build()
+                .labelNames(DEST_LABELS)
+                .name(OUTBOUND_BYTES)
+                .help("Send bytes to client of instance " + destination)
+                .create();
+        this.packetsCounter = Counter.build()
+                .labelNames(new String[]{DESTINATION, "packetType"})
+                .name(PACKET_TYPE)
+                .help("Send packets to client of instance " + destination)
+                .create();
+        this.emptyBatchesCounter = Counter.build()
+                .labelNames(DEST_LABELS)
+                .name(EMPTY_BATCHES)
+                .help("Send empty batches to client of instance " + destination)
+                .create();
+        this.errorsCounter = Counter.build()
+                .labelNames(new String[]{DESTINATION, "errorCode"})
+                .name(ERRORS)
+                .help("Client request errors of instance " + destination)
+                .create();
+        this.responseLatency = Histogram.build()
+                .labelNames(DEST_LABELS)
+                .name(LATENCY)
+                .help("Client request latency of instance " + destination)
+                // buckets in milliseconds
+                .buckets(1.0, 2.5, 5.0, 10.0, 25.0, 50.0, 100.0)
+                .create();
+    }
+
+    @Override
+    public String getDestination() {
+        return this.destination;
+    }
+
+    @Override
+    public void profiling(ClientRequestResult result) {
+        PacketType type = result.getType();
+        outboundCounter.labels(destLabelValues).inc(result.getAmount());
+        packetsCounter.labels(destination, type.name()).inc();
+        short errorCode = result.getErrorCode();
+        if (errorCode > 0) {
+            errorsCounter.labels(destination, Short.toString(errorCode)).inc();
+        }
+        long latency = result.getLatency();
+        responseLatency.labels(destLabelValues).observe(latency / 1000000);
+        switch (type) {
+            case GET:
+                boolean empty = result.getEmpty();
+                if (empty) {
+                    emptyBatchesCounter.labels(destLabelValues).inc();
+                }
+                break;
+            // reserve for others
+            default:
+                break;
+        }
+    }
+
+    @Override public void start() {
+        if (outboundCounter != null) {
+            outboundCounter.register();
+        }
+        if (packetsCounter != null) {
+            packetsCounter.register();
+        }
+        if (emptyBatchesCounter != null) {
+            emptyBatchesCounter.register();
+        }
+        if (errorsCounter != null) {
+            errorsCounter.register();
+        }
+        if (responseLatency != null) {
+            responseLatency.register();
+        }
+        running = true;
+    }
+
+    @Override public void stop() {
+        running = false;
+        if (outboundCounter != null) {
+            CollectorRegistry.defaultRegistry.unregister(outboundCounter);
+        }
+        if (packetsCounter != null) {
+            CollectorRegistry.defaultRegistry.unregister(packetsCounter);
+        }
+        if (emptyBatchesCounter != null) {
+            CollectorRegistry.defaultRegistry.unregister(emptyBatchesCounter);
+        }
+        if (errorsCounter != null) {
+            CollectorRegistry.defaultRegistry.unregister(errorsCounter);
+        }
+        if (responseLatency != null) {
+            CollectorRegistry.defaultRegistry.unregister(responseLatency);
+        }
+    }
+
+    @Override public boolean isStart() {
+        return running;
+    }
+}

+ 15 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfilerFactory.java

@@ -0,0 +1,15 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import com.alibaba.otter.canal.server.netty.ClientInstanceProfiler;
+import com.alibaba.otter.canal.server.netty.ClientInstanceProfilerFactory;
+
+/**
+ * @author Chuanyi Li
+ */
+public class PrometheusClientInstanceProfilerFactory implements ClientInstanceProfilerFactory {
+
+    @Override
+    public ClientInstanceProfiler create(String destination) {
+        return new PrometheusClientInstanceProfiler(destination);
+    }
+}

+ 0 - 11
prometheus/src/main/resources/META-INF/aop.xml

@@ -1,11 +0,0 @@
-<aspectj>
-
-    <aspects>
-        <aspect name="com.alibaba.otter.canal.prometheus.impl.InboundThroughputAspect"/>
-        <aspect name="com.alibaba.otter.canal.prometheus.impl.OutboundThroughputAspect"/>
-    </aspects>
-    <weaver options="-verbose -showWeaveInfo">
-        <include within="com.alibaba.otter.canal..*"/>
-    </weaver>
-
-</aspectj>

+ 33 - 17
server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNettyProfiler.java

@@ -10,7 +10,7 @@ import java.util.concurrent.ConcurrentMap;
  */
 public class CanalServerWithNettyProfiler {
 
-    private static final ClientInstanceProfilerFactory          DISABLED = new DefaultClientInstanceProfilerFactory();
+    public static final ClientInstanceProfilerFactory           DISABLED = new DefaultClientInstanceProfilerFactory();
     private volatile ClientInstanceProfilerFactory              factory;
     private final ConcurrentMap<String, ClientInstanceProfiler> cliPfs;
     private final CanalServerWithEmbedded                       server;
@@ -29,34 +29,47 @@ public class CanalServerWithNettyProfiler {
         return SingletonHolder.SINGLETON;
     }
 
-    public void profiling(String dest, ClientRequestResult result) {
+    public void profiling(String destination, ClientRequestResult result) {
         if (isDisabled()) {
             return;
         }
-        ClientInstanceProfiler profiler = tryGet(dest);
+        ClientInstanceProfiler profiler = cliPfs.get(destination);
         if (profiler != null) {
             profiler.profiling(result);
         }
     }
 
+    public void start(String destination) {
+        if (isDisabled()) {
+            return;
+        }
+        if (server.isStart(destination)) {
+            throw new IllegalStateException("Instance profiler should not be start while running.");
+        }
+        ClientInstanceProfiler profiler = factory.create(destination);
+        profiler.start();
+        cliPfs.put(destination, profiler);
+    }
+
     /**
      * Remove instance profiler for specified instance.
      * Only accepted while instance is not running.
-     * @param dest canal instance destination
+     * @param destination canal instance destination
      */
-    public void remove(String dest) throws IllegalStateException {
+    public void stop(String destination) {
         if (isDisabled()) {
             return;
         }
-        synchronized (cliPfs) {
-            if (server.isStart(dest)) {
-                throw new IllegalStateException("Instance profiler should not be removed while running.");
-            }
-            cliPfs.remove(dest);
+        if (server.isStart(destination)) {
+            throw new IllegalStateException("Instance profiler should not be stop while running.");
+        }
+        ClientInstanceProfiler profiler = cliPfs.remove(destination);
+        if (profiler != null && profiler.isStart()) {
+            profiler.stop();
         }
     }
 
-    public void setFactory(ClientInstanceProfilerFactory factory) {
+    public void setInstanceProfilerFactory(ClientInstanceProfilerFactory factory) {
         this.factory = factory;
     }
 
@@ -64,15 +77,18 @@ public class CanalServerWithNettyProfiler {
         return factory == DISABLED || factory == null;
     }
 
-    private ClientInstanceProfiler tryGet(String dest) {
+    private ClientInstanceProfiler tryGet(String destination) {
         //try fast get
-        ClientInstanceProfiler profiler = cliPfs.get(dest);
+        ClientInstanceProfiler profiler = cliPfs.get(destination);
         if (profiler == null) {
             synchronized (cliPfs) {
-                if (server.isStart(dest)) {
+                if (server.isStart(destination)) {
                     // avoid overwriting
-                    cliPfs.putIfAbsent(dest, factory.create());
-                    profiler = cliPfs.get(dest);
+                    cliPfs.putIfAbsent(destination, factory.create(destination));
+                    profiler = cliPfs.get(destination);
+                    if (!profiler.isStart()) {
+                        profiler.start();
+                    }
                 }
             }
         }
@@ -81,7 +97,7 @@ public class CanalServerWithNettyProfiler {
 
     private static class DefaultClientInstanceProfilerFactory implements ClientInstanceProfilerFactory {
         @Override
-        public ClientInstanceProfiler create() {
+        public ClientInstanceProfiler create(String destination) {
             throw new UnsupportedOperationException();
         }
     }

+ 2 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/ClientInstanceProfiler.java

@@ -1,11 +1,12 @@
 package com.alibaba.otter.canal.server.netty;
 
+import com.alibaba.otter.canal.common.CanalLifeCycle;
 import com.alibaba.otter.canal.server.netty.listener.ChannelFutureAggregator;
 
 /**
  * @author Chuanyi Li
  */
-public interface ClientInstanceProfiler {
+public interface ClientInstanceProfiler extends CanalLifeCycle {
 
     String getDestination();
 

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/ClientInstanceProfilerFactory.java

@@ -5,6 +5,6 @@ package com.alibaba.otter.canal.server.netty;
  */
 public interface ClientInstanceProfilerFactory {
 
-    ClientInstanceProfiler create();
+    ClientInstanceProfiler create(String destination);
 
 }

+ 1 - 0
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.server.netty.handler;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.alibaba.otter.canal.server.netty.listener.ChannelFutureAggregator;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.jboss.netty.buffer.ChannelBuffer;

+ 42 - 20
server/src/main/java/com/alibaba/otter/canal/server/netty/listener/ChannelFutureAggregator.java

@@ -15,18 +15,27 @@ public class ChannelFutureAggregator implements ChannelFutureListener {
 
     private ClientRequestResult result;
 
-    public ChannelFutureAggregator(String dest, GeneratedMessage request, CanalPacket.PacketType type, int amount, long latency) {
-        this(dest, request, type, amount, latency, 0);
+    public ChannelFutureAggregator(String destination, GeneratedMessage request, CanalPacket.PacketType type, int amount, long latency, boolean empty) {
+        this(destination, request, type, amount, latency, empty, (short) 0);
     }
 
-    private ChannelFutureAggregator(String dest, GeneratedMessage request, CanalPacket.PacketType type, int amount, long latency, int errorCode) {
+    public ChannelFutureAggregator(String destination, GeneratedMessage request, CanalPacket.PacketType type, int amount, long latency) {
+        this(destination, request, type, amount, latency, false, (short) 0);
+    }
+
+    public ChannelFutureAggregator(String destination, GeneratedMessage request, CanalPacket.PacketType type, int amount, long latency, short errorCode) {
+        this(destination, request, type, amount, latency, false, errorCode);
+    }
+
+    private ChannelFutureAggregator(String destination, GeneratedMessage request, CanalPacket.PacketType type, int amount, long latency, boolean empty, short errorCode) {
         this.result = new ClientRequestResult.Builder()
-                .dest(dest)
+                .destination(destination)
                 .type(type)
                 .request(request)
                 .amount(amount)
                 .latency(latency)
                 .errorCode(errorCode)
+                .empty(empty)
                 .build();
     }
 
@@ -36,7 +45,7 @@ public class ChannelFutureAggregator implements ChannelFutureListener {
         if (future.getCause() != null) {
             result.channelError = future.getCause();
         }
-        profiler().profiling(result.dest, result);
+        profiler().profiling(result.destination, result);
     }
 
     /**
@@ -44,39 +53,42 @@ public class ChannelFutureAggregator implements ChannelFutureListener {
      */
     public static class ClientRequestResult {
 
-        private String                 dest;
+        private String                 destination;
         private CanalPacket.PacketType type;
         private GeneratedMessage       request;
-        private int       amount;
-        private long      latency;
-        private int       errorCode;
-        private Throwable channelError;
+        private int                    amount;
+        private long                   latency;
+        private short                  errorCode;
+        private boolean                empty;
+        private Throwable              channelError;
 
         private ClientRequestResult() {}
 
         private ClientRequestResult(Builder builder) {
-            this.dest = Preconditions.checkNotNull(builder.dest);
+            this.destination = Preconditions.checkNotNull(builder.destination);
             this.type = Preconditions.checkNotNull(builder.type);
             this.request = Preconditions.checkNotNull(builder.request);
             this.amount = builder.amount;
             this.latency = builder.latency;
             this.errorCode = builder.errorCode;
+            this.empty = builder.empty;
             this.channelError = builder.channelError;
         }
 
         // auto-generated
         public static class Builder {
 
-            private String                 dest;
+            private String                 destination;
             private CanalPacket.PacketType type;
             private GeneratedMessage       request;
             private int                    amount;
             private long                   latency;
-            private int                    errorCode;
+            private short                  errorCode;
+            private boolean                empty;
             private Throwable              channelError;
 
-            Builder dest(String dest) {
-                this.dest = dest;
+            Builder destination(String destination) {
+                this.destination = destination;
                 return this;
             }
 
@@ -100,23 +112,29 @@ public class ChannelFutureAggregator implements ChannelFutureListener {
                 return this;
             }
 
-            Builder errorCode(int errorCode) {
+            Builder errorCode(short errorCode) {
                 this.errorCode = errorCode;
                 return this;
             }
 
+            Builder empty(boolean empty) {
+                this.empty = empty;
+                return this;
+            }
+
             public Builder channelError(Throwable channelError) {
                 this.channelError = channelError;
                 return this;
             }
 
             public Builder fromPrototype(ClientRequestResult prototype) {
-                dest = prototype.dest;
+                destination = prototype.destination;
                 type = prototype.type;
                 request = prototype.request;
                 amount = prototype.amount;
                 latency = prototype.latency;
                 errorCode = prototype.errorCode;
+                empty = prototype.empty;
                 channelError = prototype.channelError;
                 return this;
             }
@@ -126,8 +144,8 @@ public class ChannelFutureAggregator implements ChannelFutureListener {
             }
         }
         // getters
-        public String getDest() {
-            return dest;
+        public String getDestination() {
+            return destination;
         }
 
         public CanalPacket.PacketType getType() {
@@ -146,10 +164,14 @@ public class ChannelFutureAggregator implements ChannelFutureListener {
             return latency;
         }
 
-        public int getErrorCode() {
+        public short getErrorCode() {
             return errorCode;
         }
 
+        public boolean getEmpty() {
+            return empty;
+        }
+
         public Throwable getChannelError() {
             return channelError;
         }

+ 12 - 1
sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

@@ -39,6 +39,7 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
     protected long                 emptyTransctionThresold       = 8192;                                         // 超过1024个事务头,输出一个
     protected volatile long        lastEmptyTransactionTimestamp = 0L;
     protected AtomicLong           lastEmptyTransactionCount     = new AtomicLong(0L);
+    private AtomicLong             eventsSinkBlockingTime        = new AtomicLong(0L);
 
     public EntryEventSink(){
         addHandler(new HeartBeatEntryEventHandler());
@@ -147,15 +148,21 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
         for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
             events = handler.before(events);
         }
-
+        long blockingStart = 0L;
         int fullTimes = 0;
         do {
             if (eventStore.tryPut(events)) {
+                if (fullTimes > 0) {
+                    eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart);
+                }
                 for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
                     events = handler.after(events);
                 }
                 return true;
             } else {
+                if (fullTimes == 0) {
+                    blockingStart = System.nanoTime();
+                }
                 applyWait(++fullTimes);
             }
 
@@ -202,4 +209,8 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
         this.emptyTransctionThresold = emptyTransctionThresold;
     }
 
+    public AtomicLong getEventsSinkBlockingTime() {
+        return eventsSinkBlockingTime;
+    }
+
 }