Kaynağa Gözat

Refactor metrics code.

Chuanyi Li L 6 yıl önce
ebeveyn
işleme
fec9803aae
17 değiştirilmiş dosya ile 476 ekleme ve 333 silme
  1. 0 31
      common/src/main/java/com/alibaba/otter/canal/common/utils/SerializedLongAdder.java
  2. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  3. 16 16
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  4. 4 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  5. 3 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  6. 4 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java
  7. 52 82
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalInstanceExports.java
  8. 0 17
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalServerExports.java
  9. 16 17
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java
  10. 66 12
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/EntryCollector.java
  11. 18 9
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MetaCollector.java
  12. 0 74
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MysqlParserCollector.java
  13. 114 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/ParserCollector.java
  14. 56 11
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java
  15. 18 9
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/SinkCollector.java
  16. 89 42
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/StoreCollector.java
  17. 19 0
      store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

+ 0 - 31
common/src/main/java/com/alibaba/otter/canal/common/utils/SerializedLongAdder.java

@@ -1,31 +0,0 @@
-package com.alibaba.otter.canal.common.utils;
-
-/**
- * 用于dump记录counter
- * 这种场景下,
- * <strong>写操作>>读操作</strong>
- * dump线程为串行,读取counter时,并不严格要求最新值可见,使用普通long即可。
- *
- * @author Chuanyi Li
- */
-public class SerializedLongAdder {
-
-    private long value = 0;
-
-    public SerializedLongAdder(long initial) {
-        this.value = initial;
-    }
-
-    public void add(long x) {
-        value += x;
-    }
-
-    public void increment() {
-        value ++;
-    }
-
-    public long get() {
-        return value;
-    }
-
-}

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
-import com.alibaba.otter.canal.common.utils.SerializedLongAdder;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.math.RandomUtils;

+ 16 - 16
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -1,8 +1,8 @@
 package com.alibaba.otter.canal.parse.inbound.mysql;
 
 import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicLong;
 
-import com.alibaba.otter.canal.common.utils.SerializedLongAdder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -22,24 +22,24 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
 
 public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
-    protected final Logger         logger                  = LoggerFactory.getLogger(this.getClass());
-    protected static final long    BINLOG_START_OFFEST     = 4L;
+    protected final Logger         logger                    = LoggerFactory.getLogger(this.getClass());
+    protected static final long    BINLOG_START_OFFEST       = 4L;
 
-    protected TableMetaTSDBFactory tableMetaTSDBFactory    = new DefaultTableMetaTSDBFactory();
-    protected boolean              enableTsdb              = false;
+    protected TableMetaTSDBFactory tableMetaTSDBFactory      = new DefaultTableMetaTSDBFactory();
+    protected boolean              enableTsdb                = false;
     protected String               tsdbSpringXml;
     protected TableMetaTSDB        tableMetaTSDB;
 
     // 编码信息
-    protected byte                 connectionCharsetNumber = (byte) 33;
-    protected Charset              connectionCharset       = Charset.forName("UTF-8");
-    protected boolean              filterQueryDcl          = false;
-    protected boolean              filterQueryDml          = false;
-    protected boolean              filterQueryDdl          = false;
-    protected boolean              filterRows              = false;
-    protected boolean              filterTableError        = false;
-    protected boolean              useDruidDdlFilter       = true;
-    private final SerializedLongAdder eventsPublishBlockingTime = new SerializedLongAdder(0L);
+    protected byte                 connectionCharsetNumber   = (byte) 33;
+    protected Charset              connectionCharset         = Charset.forName("UTF-8");
+    protected boolean              filterQueryDcl            = false;
+    protected boolean              filterQueryDml            = false;
+    protected boolean              filterQueryDdl            = false;
+    protected boolean              filterRows                = false;
+    protected boolean              filterTableError          = false;
+    protected boolean              useDruidDdlFilter         = true;
+    private final AtomicLong       eventsPublishBlockingTime = new AtomicLong(0L);
 
     protected BinlogParser buildParser() {
         LogEventConvert convert = new LogEventConvert();
@@ -207,8 +207,8 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     public void setTableMetaTSDBFactory(TableMetaTSDBFactory tableMetaTSDBFactory) {
         this.tableMetaTSDBFactory = tableMetaTSDBFactory;
     }
-    
-    public SerializedLongAdder getEventsPublishBlockingTime() {
+
+    public AtomicLong getEventsPublishBlockingTime() {
         return this.eventsPublishBlockingTime;
     }
 

+ 4 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -7,8 +7,8 @@ import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
-import com.alibaba.otter.canal.common.utils.SerializedLongAdder;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
@@ -52,7 +52,7 @@ public class MysqlConnection implements ErosaConnection {
     private AuthenticationInfo        authInfo;
     protected     int                 connTimeout               = 5 * 1000;                                      // 5秒
     protected     int                 soTimeout                 = 60 * 60 * 1000;                                // 1小时
-    private       SerializedLongAdder receivedBinlogBytes;
+    private AtomicLong receivedBinlogBytes;
 
     public MysqlConnection(){
     }
@@ -473,7 +473,7 @@ public class MysqlConnection implements ErosaConnection {
 
     private void accumulateReceivedBytes(long x) {
         if (receivedBinlogBytes != null) {
-            receivedBinlogBytes.add(x);
+            receivedBinlogBytes.addAndGet(x);
         }
     }
 
@@ -618,7 +618,7 @@ public class MysqlConnection implements ErosaConnection {
         this.authInfo = authInfo;
     }
 
-    public void setReceivedBinlogBytes(SerializedLongAdder receivedBinlogBytes) {
+    public void setReceivedBinlogBytes(AtomicLong receivedBinlogBytes) {
         this.receivedBinlogBytes = receivedBinlogBytes;
     }
 

+ 3 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -10,7 +10,6 @@ import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.alibaba.otter.canal.common.utils.SerializedLongAdder;
 import org.apache.commons.lang.StringUtils;
 import org.springframework.util.CollectionUtils;
 
@@ -73,7 +72,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private       int                 dumpErrorCountThreshold   = 2;        // binlogDump失败异常计数阀值
 
     // instance received binlog bytes
-    private final SerializedLongAdder receivedBinlogBytes       = new SerializedLongAdder(0L);
+    private final AtomicLong          receivedBinlogBytes       = new AtomicLong(0L);
 
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
@@ -516,7 +515,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private Long findTransactionBeginPosition(ErosaConnection mysqlConnection, final EntryPosition entryPosition)
                                                                                                                  throws IOException {
         // 针对开始的第一条为非Begin记录,需要从该binlog扫描
-        final AtomicLong preTransactionStartPosition = new AtomicLong(0L);
+        final java.util.concurrent.atomic.AtomicLong preTransactionStartPosition = new java.util.concurrent.atomic.AtomicLong(0L);
         mysqlConnection.reconnect();
         mysqlConnection.seek(entryPosition.getJournalName(), 4L, new SinkFunction<LogEvent>() {
 
@@ -917,7 +916,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
 
 
 
-    public SerializedLongAdder getReceivedBinlogBytes() {
+    public AtomicLong getReceivedBinlogBytes() {
         return this.receivedBinlogBytes;
     }
 

+ 4 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -2,9 +2,9 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
 
-import com.alibaba.otter.canal.common.utils.SerializedLongAdder;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
@@ -65,7 +65,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     private ExecutorService              stageExecutor;
     private String                       destination;
     private volatile CanalParseException exception;
-    private SerializedLongAdder          eventsPublishBlockingTime;
+    private AtomicLong                   eventsPublishBlockingTime;
 
     public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
                                       EventTransactionBuffer transactionBuffer, String destination){
@@ -176,7 +176,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
                 }
                 disruptorMsgBuffer.publish(next);
                 if (fullTimes > 0) {
-                    eventsPublishBlockingTime.add(System.nanoTime() - blockingStart);
+                    eventsPublishBlockingTime.addAndGet(System.nanoTime() - blockingStart);
                 }
                 break;
             } catch (InsufficientCapacityException e) {
@@ -457,7 +457,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         this.connection = connection;
     }
 
-    public void setEventsPublishBlockingTime(SerializedLongAdder eventsPublishBlockingTime) {
+    public void setEventsPublishBlockingTime(AtomicLong eventsPublishBlockingTime) {
         this.eventsPublishBlockingTime = eventsPublishBlockingTime;
     }
 

+ 52 - 82
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalInstanceExports.java

@@ -2,9 +2,6 @@ package com.alibaba.otter.canal.prometheus;
 
 import com.alibaba.otter.canal.instance.core.CanalInstance;
 import com.alibaba.otter.canal.prometheus.impl.*;
-import com.alibaba.otter.canal.sink.CanalEventSink;
-import com.alibaba.otter.canal.sink.entry.EntryEventSink;
-import com.alibaba.otter.canal.store.CanalStoreException;
 import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
 import org.slf4j.Logger;
@@ -18,98 +15,71 @@ import java.util.List;
  */
 public class CanalInstanceExports {
 
-    private static final Logger      logger          = LoggerFactory.getLogger(CanalInstanceExports.class);
-    public static final String       DEST            = "destination";
-    public static final String[]     DEST_LABELS     = {DEST};
-    public static final List<String> DEST_LABEL_LIST = Collections.singletonList(DEST);
-//    private final String             destination;
-    private Collector                storeCollector;
-    private Collector                delayCollector;
-    private Collector                metaCollector;
-    private Collector                sinkCollector;
-    private Collector                parserCollector;
+    private static final Logger      logger           = LoggerFactory.getLogger(CanalInstanceExports.class);
+    public static final String       DEST             = "destination";
+    public static final String[]     DEST_LABELS      = {DEST};
+    public static final List<String> DEST_LABELS_LIST = Collections.singletonList(DEST);
+    private final Collector          storeCollector;
+    private final Collector          entryCollector;
+    private final Collector          metaCollector;
+    private final Collector          sinkCollector;
+    private final Collector          parserCollector;
 
-    public CanalInstanceExports() {
-//        this.destination = instance.getDestination();
-//        initEventsMetrics(instance);
-//        initStoreCollector(instance);
-//        initMetaCollector(instance);
-//        initSinkCollector(instance);
-//        initParserCollector(instance);
+    private CanalInstanceExports() {
+        this.storeCollector = StoreCollector.instance();
+        this.entryCollector = EntryCollector.instance();
+        this.metaCollector = MetaCollector.instance();
+        this.sinkCollector = SinkCollector.instance();
+        this.parserCollector = ParserCollector.instance();
     }
 
-
-
-//    static CanalInstanceExports forInstance(CanalInstance instance) {
-//        return new CanalInstanceExports(instance);
-//    }
-
-    void register(CanalInstance instance) {
-        if (delayCollector != null) {
-            delayCollector.register();
-        }
-        if (storeCollector != null) {
-            storeCollector.register();
-        }
-        if (metaCollector != null) {
-            metaCollector.register();
-        }
-        if (sinkCollector != null) {
-            sinkCollector.register();
-        }
-        if (parserCollector != null) {
-            parserCollector.register();
-        }
+    private static class SingletonHolder {
+        private static final CanalInstanceExports SINGLETON = new CanalInstanceExports();
     }
 
-    void unregister(CanalInstance instance) {
-        if (delayCollector != null) {
-            CollectorRegistry.defaultRegistry.unregister(delayCollector);
-        }
-        if (storeCollector != null) {
-            CollectorRegistry.defaultRegistry.unregister(storeCollector);
-        }
-        if (metaCollector != null) {
-            CollectorRegistry.defaultRegistry.unregister(metaCollector);
-        }
-        if (sinkCollector != null) {
-            CollectorRegistry.defaultRegistry.unregister(sinkCollector);
-        }
-        if (parserCollector != null) {
-            CollectorRegistry.defaultRegistry.unregister(parserCollector);
-        }
+    public static CanalInstanceExports instance() {
+        return SingletonHolder.SINGLETON;
     }
 
-    private void initEventsMetrics(CanalInstance instance) {
-        CanalEventSink sink = instance.getEventSink();
-        if (sink instanceof EntryEventSink) {
-            EntryEventSink entryEventSink = (EntryEventSink) sink;
-            // TODO ensure not to add handler again
-//            PrometheusCanalEventDownStreamHandler handler = new PrometheusCanalEventDownStreamHandler(destination);
-//            entryEventSink.addHandler(handler);
-//            delayCollector = handler.getCollector();
-        } else {
-            logger.warn("This impl register metrics for only EntryEventSink, skip.");
-        }
+    public void initialize() {
+        storeCollector.register();
+        entryCollector.register();
+        metaCollector.register();
+        sinkCollector.register();
+        parserCollector.register();
     }
 
-//    private void initStoreCollector(CanalInstance instance) {
-//        try {
-//            storeCollector = new StoreCollector(instance.getEventStore(), destination);
-//        } catch (CanalStoreException cse) {
-//            logger.warn("Failed to register metrics for destination {}.", destination, cse);
-//        }
-//    }
+    public void terminate() {
+        CollectorRegistry.defaultRegistry.unregister(storeCollector);
+        CollectorRegistry.defaultRegistry.unregister(entryCollector);
+        CollectorRegistry.defaultRegistry.unregister(metaCollector);
+        CollectorRegistry.defaultRegistry.unregister(sinkCollector);
+        CollectorRegistry.defaultRegistry.unregister(parserCollector);
+    }
 
-    private void initMetaCollector(CanalInstance instance) {
-        metaCollector = new MetaCollector(instance);
+    void register(CanalInstance instance) {
+        requiredInstanceRegistry(storeCollector).register(instance);
+        requiredInstanceRegistry(entryCollector).register(instance);
+        requiredInstanceRegistry(metaCollector).register(instance);
+        requiredInstanceRegistry(sinkCollector).register(instance);
+        requiredInstanceRegistry(parserCollector).register(instance);
+        logger.info("Successfully register metrics for instance {}.", instance.getDestination());
     }
 
-    private void initSinkCollector(CanalInstance instance) {
-//        sinkCollector = new EntryCollector(instance.getEventSink(), instance.getDestination());
+    void unregister(CanalInstance instance) {
+        requiredInstanceRegistry(storeCollector).unregister(instance);
+        requiredInstanceRegistry(entryCollector).unregister(instance);
+        requiredInstanceRegistry(metaCollector).unregister(instance);
+        requiredInstanceRegistry(sinkCollector).unregister(instance);
+        requiredInstanceRegistry(parserCollector).unregister(instance);
+        logger.info("Successfully unregister metrics for instance {}.", instance.getDestination());
     }
 
-    private void initParserCollector(CanalInstance instance) {
-        parserCollector = new MysqlParserCollector(instance.getEventParser(), instance.getDestination());
+    private InstanceRegistry requiredInstanceRegistry(Collector collector) {
+        if (!(collector instanceof InstanceRegistry)) {
+            throw new IllegalArgumentException("Canal collector need to implement InstanceRegistry.");
+        }
+        return (InstanceRegistry) collector;
     }
+
 }

+ 0 - 17
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalServerExports.java

@@ -1,17 +0,0 @@
-package com.alibaba.otter.canal.prometheus;
-
-
-/**
- * @author Chuanyi Li
- */
-public class CanalServerExports {
-
-    private static boolean initialized = false;
-
-    public static synchronized void initialize() {
-        if (!initialized) {
-            initialized = true;
-        }
-    }
-
-}

+ 16 - 17
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java

@@ -20,12 +20,13 @@ import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.
 public class PrometheusService implements CanalMetricsService {
 
     private static final Logger                           logger          = LoggerFactory.getLogger(PrometheusService.class);
-    private final CanalInstanceExports                    instanceExports = new CanalInstanceExports();
+    private final CanalInstanceExports                    instanceExports;
     private volatile boolean                              running         = false;
     private HTTPServer                                    server;
     private ClientInstanceProfiler                        clientProfiler  = new PrometheusClientInstanceProfiler();
 
     private PrometheusService() {
+        this.instanceExports = CanalInstanceExports.instance();
     }
 
     private static class SingletonHolder {
@@ -49,8 +50,7 @@ public class PrometheusService implements CanalMetricsService {
         try {
             // JVM exports
             DefaultExports.initialize();
-            // Canal server level exports
-            CanalServerExports.initialize();
+            instanceExports.initialize();
             if (!clientProfiler.isStart()) {
                 clientProfiler.start();
             }
@@ -65,13 +65,17 @@ public class PrometheusService implements CanalMetricsService {
     @Override
     public void terminate() {
         running = false;
-        instanceExports.unregister();
-        if (clientProfiler.isStart()) {
-            clientProfiler.stop();
-        }
-        profiler().setInstanceProfiler(NOP);
-        if (server != null) {
-            server.stop();
+        try {
+            instanceExports.terminate();
+            if (clientProfiler.isStart()) {
+                clientProfiler.stop();
+            }
+            profiler().setInstanceProfiler(NOP);
+            if (server != null) {
+                server.stop();
+            }
+        } catch (Throwable t) {
+            logger.warn("Something happened while terminating.", t);
         }
     }
 
@@ -87,9 +91,7 @@ public class PrometheusService implements CanalMetricsService {
             return;
         }
         try {
-            CanalInstanceExports export = CanalInstanceExports.forInstance(instance);
-            export.register();
-            exports.put(instance.getDestination(), export);
+            instanceExports.register(instance);
         } catch (Throwable t) {
             logger.warn("Unable to register instance exports for {}.", instance.getDestination(), t);
         }
@@ -102,10 +104,7 @@ public class PrometheusService implements CanalMetricsService {
             logger.warn("Try unregister metrics after destination {} is stopped.", instance.getDestination());
         }
         try {
-            CanalInstanceExports export = exports.remove(instance.getDestination());
-            if (export != null) {
-                export.unregister();
-            }
+            instanceExports.unregister(instance);
         } catch (Throwable t) {
             logger.warn("Unable to unregister instance exports for {}.", instance.getDestination(), t);
         }

+ 66 - 12
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/EntryCollector.java

@@ -2,10 +2,12 @@ package com.alibaba.otter.canal.prometheus.impl;
 
 import com.alibaba.otter.canal.instance.core.CanalInstance;
 import com.alibaba.otter.canal.prometheus.InstanceRegistry;
+import com.alibaba.otter.canal.sink.CanalEventDownStreamHandler;
 import com.alibaba.otter.canal.sink.CanalEventSink;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;
 import com.google.common.base.Preconditions;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
 import io.prometheus.client.GaugeMetricFamily;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -17,27 +19,43 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABEL_LIST;
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
 
 /**
  * @author Chuanyi Li
  */
 public class EntryCollector extends Collector implements InstanceRegistry {
 
-    private static final Logger                             logger              = LoggerFactory.getLogger(SinkCollector.class);
-    private static final String                             DELAY_NAME          = "canal_instance_traffic_delay";
-    private static final String                             delayHelpName       = "Traffic delay of canal instance";
-    private final ConcurrentMap<String, EntryMetricsHolder> instances           = new ConcurrentHashMap<String, EntryMetricsHolder>();
+    private static final Logger                             logger           = LoggerFactory.getLogger(SinkCollector.class);
+    private static final String                             DELAY            = "canal_instance_traffic_delay";
+    private static final String                             TRANSACTION      = "canal_instance_transactions";
+    private static final String                             DELAY_HELP       = "Traffic delay of canal instance";
+    private static final String                             TRANSACTION_HELP = "Transactions counter of canal instance";
+    private final ConcurrentMap<String, EntryMetricsHolder> instances        = new ConcurrentHashMap<String, EntryMetricsHolder>();
+
+    private EntryCollector() {}
+
+    private static class SingletonHolder {
+        private static final EntryCollector SINGLETON = new EntryCollector();
+    }
+
+    public static EntryCollector instance() {
+        return SingletonHolder.SINGLETON;
+    }
 
     @Override
     public List<MetricFamilySamples> collect() {
         List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
-        GaugeMetricFamily delay = new GaugeMetricFamily(DELAY_NAME,
-                delayHelpName, DEST_LABEL_LIST);
+        GaugeMetricFamily delay = new GaugeMetricFamily(DELAY,
+                DELAY_HELP, DEST_LABELS_LIST);
+        CounterMetricFamily transactions = new CounterMetricFamily(TRANSACTION,
+                TRANSACTION_HELP, DEST_LABELS_LIST);
         for (EntryMetricsHolder emh : instances.values()) {
             delay.addMetric(emh.destLabelValues, emh.latestExecTime.doubleValue());
+            transactions.addMetric(emh.destLabelValues, emh.transactionCounter.doubleValue());
         }
         mfs.add(delay);
+        mfs.add(transactions);
         return mfs;
     }
 
@@ -51,25 +69,61 @@ public class EntryCollector extends Collector implements InstanceRegistry {
             throw new IllegalArgumentException("CanalEventSink must be EntryEventSink");
         }
         EntryEventSink entrySink = (EntryEventSink) sink;
-        PrometheusCanalEventDownStreamHandler handler = new PrometheusCanalEventDownStreamHandler();
+        PrometheusCanalEventDownStreamHandler handler = assembleHandler(entrySink);
         holder.latestExecTime = handler.getLatestExecuteTime();
-        entrySink.addHandler(handler, 0);
-        Preconditions.checkNotNull(holder.destLabelValues);
+        holder.transactionCounter = handler.getTransactionCounter();
         Preconditions.checkNotNull(holder.latestExecTime);
-        EntryMetricsHolder old = instances.putIfAbsent(destination, holder);
+        Preconditions.checkNotNull(holder.transactionCounter);
+        EntryMetricsHolder old = instances.put(destination, holder);
         if (old != null) {
-            logger.warn("Ignore repeated EntryCollector register for instance {}.", destination);
+            logger.warn("Remove stale EntryCollector for instance {}.", destination);
         }
     }
 
     @Override
     public void unregister(CanalInstance instance) {
         final String destination = instance.getDestination();
+        CanalEventSink sink = instance.getEventSink();
+        if (!(sink instanceof EntryEventSink)) {
+            throw new IllegalArgumentException("CanalEventSink must be EntryEventSink");
+        }
+        unloadHandler((EntryEventSink) sink);
         instances.remove(destination);
     }
 
+    private PrometheusCanalEventDownStreamHandler assembleHandler(EntryEventSink entrySink) {
+        PrometheusCanalEventDownStreamHandler ph = new PrometheusCanalEventDownStreamHandler();
+        List<CanalEventDownStreamHandler> handlers = entrySink.getHandlers();
+        for (CanalEventDownStreamHandler handler : handlers) {
+            if (handler instanceof PrometheusCanalEventDownStreamHandler) {
+                throw new IllegalStateException("PrometheusCanalEventDownStreamHandler already exists in handlers.");
+            }
+        }
+        entrySink.addHandler(ph, 0);
+        return ph;
+    }
+
+    private void unloadHandler(EntryEventSink entrySink) {
+        List<CanalEventDownStreamHandler> handlers = entrySink.getHandlers();
+        int i = 0;
+        for (; i < handlers.size(); i++) {
+            if (handlers.get(i) instanceof PrometheusCanalEventDownStreamHandler) {
+                break;
+            }
+        }
+        entrySink.removeHandler(i);
+        // Ensure no PrometheusCanalEventDownStreamHandler
+        handlers = entrySink.getHandlers();
+        for (CanalEventDownStreamHandler handler : handlers) {
+            if (handler instanceof PrometheusCanalEventDownStreamHandler) {
+                throw new IllegalStateException("Multiple prometheusCanalEventDownStreamHandler exists in handlers.");
+            }
+        }
+    }
+
     private class EntryMetricsHolder {
         private AtomicLong   latestExecTime;
+        private AtomicLong   transactionCounter;
         private List<String> destLabelValues;
     }
 

+ 18 - 9
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MetaCollector.java

@@ -23,18 +23,28 @@ import java.util.concurrent.ConcurrentMap;
  */
 public class MetaCollector extends Collector implements InstanceRegistry {
 
-    private static final List<String>                      InfoLabel        = Arrays.asList("destination", "mode");
-    private static final Logger                            logger           = LoggerFactory.getLogger(MetaCollector.class);
-    private static final String                            SUBSCRIPTION     = "canal_instance";
-    private static final String                            subscriptionHelp = "Canal instance";
-    private final ConcurrentMap<String, MetaMetricsHolder> instances        = new ConcurrentHashMap<String, MetaMetricsHolder>();
+    private static final List<String>                      InfoLabel         = Arrays.asList("destination", "mode");
+    private static final Logger                            logger            = LoggerFactory.getLogger(MetaCollector.class);
+    private static final String                            SUBSCRIPTION      = "canal_instance";
+    private static final String                            SUBSCRIPTION_HELP = "Canal instance";
+    private final ConcurrentMap<String, MetaMetricsHolder> instances         = new ConcurrentHashMap<String, MetaMetricsHolder>();
+
+    private MetaCollector() {}
+
+    private static class SingletonHolder {
+        private static final MetaCollector SINGLETON = new MetaCollector();
+    }
+
+    public static MetaCollector instance() {
+        return SingletonHolder.SINGLETON;
+    }
 
     @Override
     public List<MetricFamilySamples> collect() {
         List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
         GaugeMetricFamily instanceInfo = new GaugeMetricFamily(
                 SUBSCRIPTION,
-                subscriptionHelp,
+                SUBSCRIPTION_HELP,
                 InfoLabel);
         for (Map.Entry<String, MetaMetricsHolder> nme : instances.entrySet()) {
             final String destination = nme.getKey();
@@ -54,11 +64,10 @@ public class MetaCollector extends Collector implements InstanceRegistry {
         String mode = (instance instanceof CanalInstanceWithSpring) ? "spring" : "manager";
         holder.infoLabelValues = Arrays.asList(destination, mode);
         holder.metaManager = instance.getMetaManager();
-        Preconditions.checkNotNull(holder.infoLabelValues);
         Preconditions.checkNotNull(holder.metaManager);
-        MetaMetricsHolder old = instances.putIfAbsent(destination, holder);
+        MetaMetricsHolder old = instances.put(destination, holder);
         if (old != null) {
-            logger.warn("Ignore repeated MetaCollector register for instance {}.", destination);
+            logger.warn("Remove stale MetaCollector for instance {}.", destination);
         }
     }
 

+ 0 - 74
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MysqlParserCollector.java

@@ -1,74 +0,0 @@
-package com.alibaba.otter.canal.prometheus.impl;
-
-import com.alibaba.otter.canal.common.utils.SerializedLongAdder;
-import com.alibaba.otter.canal.parse.CanalEventParser;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
-import io.prometheus.client.Collector;
-import io.prometheus.client.CounterMetricFamily;
-import io.prometheus.client.GaugeMetricFamily;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
-import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABEL_LIST;
-
-/**
- * @author Chuanyi Li
- */
-public class MysqlParserCollector extends Collector {
-
-    private static final String       PUBLISH_BLOCKING_TIME = "canal_instance_publish_blocking_time";
-    private static final String       RECEIVED_BINLOG_BYTES = "canal_instance_received_binlog_bytes";
-    private static final String       PARSER_MODE           = "canal_instance_parser_mode";
-    private static final String       MODE_LABEL            = "parallel";
-    private final List<String>        destLabelValues;
-    private final List<String>        modeLabelValues;
-    private final SerializedLongAdder receivedBinlogBytes;
-    private final SerializedLongAdder eventsPublishBlockingTime;
-    private final String              publishBlockingTimeHelp;
-    private final String              receivedBinlogBytesHelp;
-    private final String              modeHelp;
-    private final List<String>        modeLabels;
-    private final Boolean             isParallel;
-
-    public MysqlParserCollector(CanalEventParser parser, String destination) {
-        if (!(parser instanceof MysqlEventParser)) {
-            throw new IllegalArgumentException("CanalEventParser must be MysqlEventParser");
-        }
-        this.destLabelValues = Collections.singletonList(destination);
-        this.eventsPublishBlockingTime = ((MysqlEventParser)parser).getEventsPublishBlockingTime();
-        this.publishBlockingTimeHelp = "Blocking time of publishing of instance " + destination;
-        this.receivedBinlogBytes = ((MysqlEventParser)parser).getReceivedBinlogBytes();
-        this.receivedBinlogBytesHelp = "Received binlog bytes of instance" + destination;
-        this.isParallel = ((MysqlEventParser)parser).isParallel();
-        this.modeHelp = "Parser mode of instance" + destination;
-        this.modeLabels = Arrays.asList(DEST, MODE_LABEL);
-        this.modeLabelValues = Arrays.asList(destination, isParallel.toString());
-    }
-
-    @Override public List<MetricFamilySamples> collect() {
-        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
-        // canal_instance_received_binlog_bytes
-        CounterMetricFamily bytesCounter = new CounterMetricFamily(RECEIVED_BINLOG_BYTES,
-                receivedBinlogBytesHelp, DEST_LABEL_LIST);
-        bytesCounter.addMetric(destLabelValues, receivedBinlogBytes.get());
-        mfs.add(bytesCounter);
-        // canal_instance_parser_mode
-        GaugeMetricFamily modeGauge = new GaugeMetricFamily(PARSER_MODE,
-                modeHelp,
-                modeLabels);
-        modeGauge.addMetric(modeLabelValues, 1);
-        mfs.add(modeGauge);
-        // canal_instance_publish_blocking_time
-        if (isParallel) {
-            CounterMetricFamily blockingCounter = new CounterMetricFamily(PUBLISH_BLOCKING_TIME,
-                    publishBlockingTimeHelp, DEST_LABEL_LIST);
-            blockingCounter.addMetric(destLabelValues, eventsPublishBlockingTime.get());
-            mfs.add(blockingCounter);
-        }
-        return mfs;
-    }
-}

+ 114 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/ParserCollector.java

@@ -0,0 +1,114 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+import com.alibaba.otter.canal.parse.CanalEventParser;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
+import com.alibaba.otter.canal.prometheus.InstanceRegistry;
+import com.google.common.base.Preconditions;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.GaugeMetricFamily;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
+
+/**
+ * @author Chuanyi Li
+ */
+public class ParserCollector extends Collector implements InstanceRegistry {
+
+    private static final Logger                              logger                = LoggerFactory.getLogger(ParserCollector.class);
+    private static final String                              PUBLISH_BLOCKING      = "canal_instance_publish_blocking_time";
+    private static final String                              RECEIVED_BINLOG       = "canal_instance_received_binlog_bytes";
+    private static final String                              PARSER_MODE           = "canal_instance_parser_mode";
+    private static final String                              MODE_LABEL            = "parallel";
+    private static final String                              PUBLISH_BLOCKING_HELP = "Publish blocking time of dump thread";
+    private static final String                              RECEIVED_BINLOG_HELP  = "Received binlog bytes";
+    private static final String                              MODE_HELP             = "Parser mode(parallel/serial) of instance";
+    private final List<String>                               modeLabels            = Arrays.asList(DEST, MODE_LABEL);
+    private final ConcurrentMap<String, ParserMetricsHolder> instances             = new ConcurrentHashMap<String, ParserMetricsHolder>();
+
+    private ParserCollector() {}
+
+    private static class SingletonHolder {
+        private static final ParserCollector SINGLETON = new ParserCollector();
+    }
+
+    public static ParserCollector instance() {
+        return SingletonHolder.SINGLETON;
+    }
+
+    @Override
+    public List<MetricFamilySamples> collect() {
+        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
+        boolean hasParallel = false;
+        CounterMetricFamily bytesCounter = new CounterMetricFamily(RECEIVED_BINLOG,
+                RECEIVED_BINLOG_HELP, DEST_LABELS_LIST);
+        GaugeMetricFamily modeGauge = new GaugeMetricFamily(PARSER_MODE,
+                MODE_HELP, modeLabels);
+        CounterMetricFamily blockingCounter = new CounterMetricFamily(PUBLISH_BLOCKING,
+                PUBLISH_BLOCKING_HELP, DEST_LABELS_LIST);
+        for (ParserMetricsHolder emh : instances.values()) {
+            if (emh.isParallel) {
+                blockingCounter.addMetric(emh.destLabelValues, emh.eventsPublishBlockingTime.doubleValue());
+                hasParallel = true;
+            }
+            modeGauge.addMetric(emh.modeLabelValues, 1);
+            bytesCounter.addMetric(emh.destLabelValues, emh.receivedBinlogBytes.doubleValue());
+
+        }
+        mfs.add(bytesCounter);
+        mfs.add(modeGauge);
+        if (hasParallel) {
+            mfs.add(blockingCounter);
+        }
+        return mfs;
+    }
+
+    @Override
+    public void register(CanalInstance instance) {
+        final String destination = instance.getDestination();
+        ParserMetricsHolder holder = new ParserMetricsHolder();
+        CanalEventParser parser = instance.getEventParser();
+        if (!(parser instanceof MysqlEventParser)) {
+            throw new IllegalArgumentException("CanalEventParser must be MysqlEventParser");
+        }
+        MysqlEventParser mysqlParser = (MysqlEventParser) parser;
+        holder.destLabelValues = Collections.singletonList(destination);
+        holder.modeLabelValues = Arrays.asList(destination, Boolean.toString(mysqlParser.isParallel()));
+        holder.eventsPublishBlockingTime = mysqlParser.getEventsPublishBlockingTime();
+        holder.receivedBinlogBytes = mysqlParser.getReceivedBinlogBytes();
+        holder.isParallel = mysqlParser.isParallel();
+        Preconditions.checkNotNull(holder.eventsPublishBlockingTime);
+        Preconditions.checkNotNull(holder.receivedBinlogBytes);
+        ParserMetricsHolder old = instances.put(destination, holder);
+        if (old != null) {
+            logger.warn("Remove stale ParserCollector for instance {}.", destination);
+        }
+    }
+
+    @Override
+    public void unregister(CanalInstance instance) {
+        final String destination = instance.getDestination();
+        instances.remove(destination);
+    }
+
+    private class ParserMetricsHolder {
+        private List<String> destLabelValues;
+        private List<String> modeLabelValues;
+        private AtomicLong   receivedBinlogBytes;
+        private AtomicLong   eventsPublishBlockingTime;
+        private boolean      isParallel;
+    }
+
+}

+ 56 - 11
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java

@@ -1,7 +1,9 @@
 package com.alibaba.otter.canal.prometheus.impl;
 
+import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
 import com.alibaba.otter.canal.sink.AbstractCanalEventDownStreamHandler;
 import com.alibaba.otter.canal.store.model.Event;
+
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -10,28 +12,71 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDownStreamHandler<List<Event>> {
 
-    private final AtomicLong    latestExecuteTime = new AtomicLong(0L);
+    private final AtomicLong latestExecuteTime  = new AtomicLong(0L);
+    private final AtomicLong transactionCounter = new AtomicLong(0L);
+    private final AtomicLong rowEventCounter    = new AtomicLong(0L);
+    private final AtomicLong rowsCounter        = new AtomicLong(0L);
 
     @Override
     public List<Event> before(List<Event> events) {
-        // TODO utilize MySQL master heartbeat packet to refresh delay if always no more events coming
-        // see: https://dev.mysql.com/worklog/task/?id=342
-        // heartbeats are sent by the master only if there is no
-        // more unsent events in the actual binlog file for a period longer that
-        // master_heartbeat_period.
+        long localExecTime = 0L;
         if (events != null && !events.isEmpty()) {
-            Event last = events.get(events.size() - 1);
-            long ts = last.getExecuteTime();
-            long ls = latestExecuteTime.get();
-            if (ts > ls) {
-                latestExecuteTime.lazySet(ts);
+            for (Event e : events) {
+                EntryType type = e.getEntryType();
+                if (type == null) continue;
+                long exec = e.getExecuteTime();
+                if (exec > 0) localExecTime = exec;
+                switch (type) {
+                    case ROWDATA:
+                        // TODO 当前proto无法直接获得荣威change的变更行数(需要parse),可考虑放到header里面
+                        break;
+                    case TRANSACTIONEND:
+                        transactionCounter.incrementAndGet();
+                        break;
+                    case HEARTBEAT:
+                        // TODO 确认一下不是canal自己产生的
+                        // EventType eventType = e.getEventType();
+                        // TODO utilize MySQL master heartbeat packet to refresh delay if always no more events coming
+                        // see: https://dev.mysql.com/worklog/task/?id=342
+                        // heartbeats are sent by the master only if there is no
+                        // more unsent events in the actual binlog file for a period longer that
+                        // master_heartbeat_period.
+                        break;
+                    default:
+                        break;
+                }
+            }
+            if (localExecTime > 0) {
+                latestExecuteTime.lazySet(localExecTime);
             }
         }
         return events;
     }
 
+    @Override
+    public void start() {
+
+        super.start();
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+    }
+
     public AtomicLong getLatestExecuteTime() {
         return latestExecuteTime;
     }
 
+    public AtomicLong getTransactionCounter() {
+        return transactionCounter;
+    }
+
+    public AtomicLong getRowsCounter() {
+        return rowsCounter;
+    }
+
+    public AtomicLong getRowEventCounter() {
+        return rowEventCounter;
+    }
 }

+ 18 - 9
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/SinkCollector.java

@@ -17,23 +17,33 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABEL_LIST;
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
 
 /**
  * @author Chuanyi Li
  */
 public class SinkCollector extends Collector implements InstanceRegistry {
 
-    private static final Logger                            logger             = LoggerFactory.getLogger(SinkCollector.class);
-    private static final String                            SINK_BLOCKING_TIME = "canal_instance_sink_blocking_time";
-    private static final String                            sinkBlockTimeHelp  = "Total sink blocking time";
-    private final ConcurrentMap<String, SinkMetricsHolder> instances          = new ConcurrentHashMap<String, SinkMetricsHolder>();
+    private static final Logger                            logger               = LoggerFactory.getLogger(SinkCollector.class);
+    private static final String                            SINK_BLOCKING_TIME   = "canal_instance_sink_blocking_time";
+    private static final String                            SINK_BLOCK_TIME_HELP = "Total sink blocking time";
+    private final ConcurrentMap<String, SinkMetricsHolder> instances            = new ConcurrentHashMap<String, SinkMetricsHolder>();
+
+    private SinkCollector() {}
+
+    private static class SingletonHolder {
+        private static final SinkCollector SINGLETON = new SinkCollector();
+    }
+
+    public static SinkCollector instance() {
+        return SingletonHolder.SINGLETON;
+    }
 
     @Override
     public List<MetricFamilySamples> collect() {
         List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
         CounterMetricFamily blockingCounter = new CounterMetricFamily(SINK_BLOCKING_TIME,
-                sinkBlockTimeHelp, DEST_LABEL_LIST);
+                SINK_BLOCK_TIME_HELP, DEST_LABELS_LIST);
         for (SinkMetricsHolder smh : instances.values()) {
             blockingCounter.addMetric(smh.destLabelValues, smh.eventsSinkBlockingTime.doubleValue());
         }
@@ -52,11 +62,10 @@ public class SinkCollector extends Collector implements InstanceRegistry {
         }
         EntryEventSink entrySink = (EntryEventSink) sink;
         holder.eventsSinkBlockingTime = entrySink.getEventsSinkBlockingTime();
-        Preconditions.checkNotNull(holder.destLabelValues);
         Preconditions.checkNotNull(holder.eventsSinkBlockingTime);
-        SinkMetricsHolder old = instances.putIfAbsent(destination, holder);
+        SinkMetricsHolder old = instances.put(destination, holder);
         if (old != null) {
-            logger.warn("Ignore repeated SinkCollector register for instance {}.", destination);
+            logger.warn("Remote stale SinkCollector for instance {}.", destination);
         }
     }
 

+ 89 - 42
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/StoreCollector.java

@@ -1,15 +1,17 @@
 package com.alibaba.otter.canal.prometheus.impl;
 
 import com.alibaba.otter.canal.instance.core.CanalInstance;
-import com.alibaba.otter.canal.prometheus.CanalInstanceExports;
 import com.alibaba.otter.canal.prometheus.InstanceRegistry;
 import com.alibaba.otter.canal.store.CanalEventStore;
-import com.alibaba.otter.canal.store.CanalStoreException;
 import com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer;
+import com.alibaba.otter.canal.store.model.BatchMode;
+import com.google.common.base.Preconditions;
 import io.prometheus.client.Collector;
 import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.GaugeMetricFamily;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -18,68 +20,113 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
+
 /**
  * @author Chuanyi Li
  */
 public class StoreCollector extends Collector implements InstanceRegistry {
 
-    private static final Class<MemoryEventStoreWithBuffer> clazz  = MemoryEventStoreWithBuffer.class;
-    private static final String                             PRODUCE     = "canal_instance_store_produce_seq";
-    private static final String                             CONSUME     = "canal_instance_store_consume_seq";
-    private static final String                             produceHelp = "Produced sequence of canal instance";
-    private static final String                             consumeHelp = "Consumed sequence of canal instance";
-    private final ConcurrentMap<String, StoreMetricsHolder> instances   = new ConcurrentHashMap<String, StoreMetricsHolder>();
+    private static final Logger                             logger           = LoggerFactory.getLogger(SinkCollector.class);
+    private static final String                             PRODUCE          = "canal_instance_store_produce_seq";
+    private static final String                             CONSUME          = "canal_instance_store_consume_seq";
+    private static final String                             STORE            = "canal_instance_store";
+    private static final String                             PRODUCE_MEM      = "canal_instance_store_produce_mem";
+    private static final String                             CONSUME_MEM      = "canal_instance_store_consume_mem";
+    private static final String                             PRODUCE_HELP     = "Produced events counter of canal instance";
+    private static final String                             CONSUME_HELP     = "Consumed events counter of canal instance";
+    private static final String                             STORE_HELP       = "Canal instance info";
+    private static final String                             PRODUCE_MEM_HELP = "Produced mem bytes of canal instance";
+    private static final String                             CONSUME_MEM_HELP = "Consumed mem bytes of canal instance";
+    private final ConcurrentMap<String, StoreMetricsHolder> instances        = new ConcurrentHashMap<String, StoreMetricsHolder>();
+    private final List<String>                              storeLabelsList  = Arrays.asList(DEST, "batchMode");
 
+    private StoreCollector() {}
 
-    public StoreCollector(CanalEventStore store, String destination) {
-        this.destination = destination;
-        if (!(store instanceof MemoryEventStoreWithBuffer)) {
-            throw new IllegalArgumentException("EventStore must be MemoryEventStoreWithBuffer");
-        }
-        MemoryEventStoreWithBuffer ms = (MemoryEventStoreWithBuffer) store;
-        putSequence = getDeclaredValue(ms, "putSequence");
-        ackSequence = getDeclaredValue(ms, "ackSequence");
+    private static class SingletonHolder {
+        private static final StoreCollector SINGLETON = new StoreCollector();
+    }
+
+    public static StoreCollector instance() {
+        return SingletonHolder.SINGLETON;
     }
 
     @Override
     public List<MetricFamilySamples> collect() {
         List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
-        CounterMetricFamily put = new CounterMetricFamily("canal_instance_store_produce_seq",
-                putHelp, Arrays.asList(CanalInstanceExports.DEST_LABELS));
-        put.addMetric(Collections.singletonList(destination), putSequence.doubleValue());
+        CounterMetricFamily put = new CounterMetricFamily(PRODUCE,
+                PRODUCE_HELP, DEST_LABELS_LIST);
+        CounterMetricFamily ack = new CounterMetricFamily(CONSUME,
+                CONSUME_HELP, DEST_LABELS_LIST);
+        GaugeMetricFamily store = new GaugeMetricFamily(STORE,
+                STORE_HELP, storeLabelsList);
+        CounterMetricFamily putMem = new CounterMetricFamily(PRODUCE_MEM,
+                PRODUCE_MEM_HELP, DEST_LABELS_LIST);
+        CounterMetricFamily ackMem = new CounterMetricFamily(CONSUME_MEM,
+                CONSUME_MEM_HELP, DEST_LABELS_LIST);
+        boolean hasMem = false;
+        for (StoreMetricsHolder smh : instances.values()) {
+            final boolean isMem = smh.batchMode.isMemSize();
+            put.addMetric(smh.destLabelValues, smh.putSeq.doubleValue());
+            ack.addMetric(smh.destLabelValues, smh.ackSeq.doubleValue());
+            store.addMetric(smh.storeLabelValues, 1);
+            if (isMem) {
+                hasMem = true;
+                putMem.addMetric(smh.destLabelValues, smh.putMemSize.doubleValue());
+                ackMem.addMetric(smh.destLabelValues, smh.ackMemSize.doubleValue());
+            }
+        }
         mfs.add(put);
-        CounterMetricFamily ack = new CounterMetricFamily("canal_instance_store_consume_seq",
-                ackHelp, Arrays.asList(CanalInstanceExports.DEST_LABELS));
-        ack.addMetric(Collections.singletonList(destination), ackSequence.doubleValue());
         mfs.add(ack);
-        return mfs;
-    }
-
-    @SuppressWarnings("unchecked")
-    private static <T> T getDeclaredValue(MemoryEventStoreWithBuffer store, String name) {
-        T value;
-        try {
-            Field putField = clazz.getDeclaredField(name);
-            putField.setAccessible(true);
-            value = (T) putField.get(store);
-        } catch (NoSuchFieldException e) {
-            throw new CanalStoreException(e);
-        } catch (IllegalAccessException e) {
-            throw new CanalStoreException(e);
+        mfs.add(store);
+        if (hasMem) {
+            mfs.add(putMem);
+            mfs.add(ackMem);
         }
-        return value;
+        return mfs;
     }
 
     @Override public void register(CanalInstance instance) {
-
+        final String destination = instance.getDestination();
+        StoreMetricsHolder holder = new StoreMetricsHolder();
+        CanalEventStore store = instance.getEventStore();
+        if (!(store instanceof MemoryEventStoreWithBuffer)) {
+            throw new IllegalArgumentException("EventStore must be MemoryEventStoreWithBuffer");
+        }
+        MemoryEventStoreWithBuffer memStore = (MemoryEventStoreWithBuffer) store;
+        holder.batchMode = memStore.getBatchMode();
+        holder.putSeq = memStore.getPutSequence();
+        holder.ackSeq = memStore.getAckSequence();
+        holder.destLabelValues = Collections.singletonList(destination);
+        holder.storeLabelValues = Arrays.asList(destination, memStore.getBatchMode().name());
+        Preconditions.checkNotNull(holder.batchMode);
+        Preconditions.checkNotNull(holder.putSeq);
+        Preconditions.checkNotNull(holder.ackSeq);
+        if (holder.batchMode.isMemSize()) {
+            holder.putMemSize = memStore.getPutMemSize();
+            holder.ackMemSize = memStore.getAckMemSize();
+            Preconditions.checkNotNull(holder.putMemSize);
+            Preconditions.checkNotNull(holder.ackMemSize);
+        }
+        StoreMetricsHolder old = instances.putIfAbsent(destination, holder);
+        if (old != null) {
+            logger.warn("Remote stale StoreCollector for instance {}.", destination);
+        }
     }
 
     @Override public void unregister(CanalInstance instance) {
-
+        final String destination = instance.getDestination();
+        instances.remove(destination);
     }
 
     private class StoreMetricsHolder {
-        private AtomicLong putSeq;
-        private AtomicLong ackSeq;
+        private AtomicLong   putSeq;
+        private AtomicLong   ackSeq;
+        private BatchMode    batchMode;
+        private AtomicLong   putMemSize;
+        private AtomicLong   ackMemSize;
+        private List<String> destLabelValues;
+        private List<String> storeLabelValues;
     }
 }

+ 19 - 0
store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

@@ -562,4 +562,23 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
         this.ddlIsolation = ddlIsolation;
     }
 
+    public AtomicLong getPutSequence() {
+        return putSequence;
+    }
+
+    public AtomicLong getAckSequence() {
+        return ackSequence;
+    }
+
+    public AtomicLong getPutMemSize() {
+        return putMemSize;
+    }
+
+    public AtomicLong getAckMemSize() {
+        return ackMemSize;
+    }
+
+    public BatchMode getBatchMode() {
+        return batchMode;
+    }
 }