Sfoglia il codice sorgente

Prometheus refactor.

Chuanyi Li 6 anni fa
parent
commit
b5a1898cec
16 ha cambiato i file con 346 aggiunte e 337 eliminazioni
  1. 25 29
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalInstanceExports.java
  2. 14 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/InstanceRegistry.java
  3. 16 15
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java
  4. 76 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/EntryCollector.java
  5. 0 45
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/EntrySinkCollector.java
  6. 0 64
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/InstanceMetaCollector.java
  7. 77 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MetaCollector.java
  8. 7 43
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java
  9. 21 26
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfiler.java
  10. 0 15
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfilerFactory.java
  11. 73 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/SinkCollector.java
  12. 24 9
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/StoreCollector.java
  13. 10 76
      server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNettyProfiler.java
  14. 2 4
      server/src/main/java/com/alibaba/otter/canal/server/netty/ClientInstanceProfiler.java
  15. 0 10
      server/src/main/java/com/alibaba/otter/canal/server/netty/ClientInstanceProfilerFactory.java
  16. 1 1
      server/src/main/java/com/alibaba/otter/canal/server/netty/listener/ChannelFutureAggregator.java

+ 25 - 29
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalInstanceExports.java

@@ -13,8 +13,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.List;
 
-import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.profiler;
-
 /**
  * @author Chuanyi Li
  */
@@ -24,30 +22,29 @@ public class CanalInstanceExports {
     public static final String       DEST            = "destination";
     public static final String[]     DEST_LABELS     = {DEST};
     public static final List<String> DEST_LABEL_LIST = Collections.singletonList(DEST);
-    private final String             destination;
+//    private final String             destination;
     private Collector                storeCollector;
     private Collector                delayCollector;
     private Collector                metaCollector;
     private Collector                sinkCollector;
     private Collector                parserCollector;
 
-    private CanalInstanceExports(CanalInstance instance) {
-        this.destination = instance.getDestination();
-        initEventsMetrics(instance);
-        initStoreCollector(instance);
-        initMetaCollector(instance);
-        initSinkCollector(instance);
-        initParserCollector(instance);
+    public CanalInstanceExports() {
+//        this.destination = instance.getDestination();
+//        initEventsMetrics(instance);
+//        initStoreCollector(instance);
+//        initMetaCollector(instance);
+//        initSinkCollector(instance);
+//        initParserCollector(instance);
     }
 
 
 
-    static CanalInstanceExports forInstance(CanalInstance instance) {
-        return new CanalInstanceExports(instance);
-    }
+//    static CanalInstanceExports forInstance(CanalInstance instance) {
+//        return new CanalInstanceExports(instance);
+//    }
 
-    void register() {
-        profiler().start(destination);
+    void register(CanalInstance instance) {
         if (delayCollector != null) {
             delayCollector.register();
         }
@@ -65,8 +62,7 @@ public class CanalInstanceExports {
         }
     }
 
-    void unregister() {
-        profiler().stop(destination);
+    void unregister(CanalInstance instance) {
         if (delayCollector != null) {
             CollectorRegistry.defaultRegistry.unregister(delayCollector);
         }
@@ -89,28 +85,28 @@ public class CanalInstanceExports {
         if (sink instanceof EntryEventSink) {
             EntryEventSink entryEventSink = (EntryEventSink) sink;
             // TODO ensure not to add handler again
-            PrometheusCanalEventDownStreamHandler handler = new PrometheusCanalEventDownStreamHandler(destination);
-            entryEventSink.addHandler(handler);
-            delayCollector = handler.getCollector();
+//            PrometheusCanalEventDownStreamHandler handler = new PrometheusCanalEventDownStreamHandler(destination);
+//            entryEventSink.addHandler(handler);
+//            delayCollector = handler.getCollector();
         } else {
             logger.warn("This impl register metrics for only EntryEventSink, skip.");
         }
     }
 
-    private void initStoreCollector(CanalInstance instance) {
-        try {
-            storeCollector = new MemoryStoreCollector(instance.getEventStore(), destination);
-        } catch (CanalStoreException cse) {
-            logger.warn("Failed to register metrics for destination {}.", destination, cse);
-        }
-    }
+//    private void initStoreCollector(CanalInstance instance) {
+//        try {
+//            storeCollector = new StoreCollector(instance.getEventStore(), destination);
+//        } catch (CanalStoreException cse) {
+//            logger.warn("Failed to register metrics for destination {}.", destination, cse);
+//        }
+//    }
 
     private void initMetaCollector(CanalInstance instance) {
-        metaCollector = new InstanceMetaCollector(instance);
+        metaCollector = new MetaCollector(instance);
     }
 
     private void initSinkCollector(CanalInstance instance) {
-        sinkCollector = new EntrySinkCollector(instance.getEventSink(), instance.getDestination());
+//        sinkCollector = new EntryCollector(instance.getEventSink(), instance.getDestination());
     }
 
     private void initParserCollector(CanalInstance instance) {

+ 14 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/InstanceRegistry.java

@@ -0,0 +1,14 @@
+package com.alibaba.otter.canal.prometheus;
+
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+
+/**
+ * @author Chuanyi Li
+ */
+public interface InstanceRegistry {
+
+    void register(CanalInstance instance);
+
+    void unregister(CanalInstance instance);
+
+}

+ 16 - 15
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java

@@ -1,8 +1,8 @@
 package com.alibaba.otter.canal.prometheus;
 
 import com.alibaba.otter.canal.instance.core.CanalInstance;
-import com.alibaba.otter.canal.prometheus.impl.PrometheusClientInstanceProfilerFactory;
-import com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler;
+import com.alibaba.otter.canal.prometheus.impl.PrometheusClientInstanceProfiler;
+import com.alibaba.otter.canal.server.netty.ClientInstanceProfiler;
 import com.alibaba.otter.canal.spi.CanalMetricsService;
 import io.prometheus.client.exporter.HTTPServer;
 import io.prometheus.client.hotspot.DefaultExports;
@@ -10,10 +10,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
-import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.DISABLED;
+import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.NOP;
 import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.profiler;
 
 /**
@@ -21,10 +19,11 @@ import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.
  */
 public class PrometheusService implements CanalMetricsService {
 
-    private static final Logger                           logger  = LoggerFactory.getLogger(PrometheusService.class);
-    private final Map<String, CanalInstanceExports>       exports = new ConcurrentHashMap<String, CanalInstanceExports>();
-    private volatile boolean                              running = false;
+    private static final Logger                           logger          = LoggerFactory.getLogger(PrometheusService.class);
+    private final CanalInstanceExports                    instanceExports = new CanalInstanceExports();
+    private volatile boolean                              running         = false;
     private HTTPServer                                    server;
+    private ClientInstanceProfiler                        clientProfiler  = new PrometheusClientInstanceProfiler();
 
     private PrometheusService() {
     }
@@ -52,7 +51,10 @@ public class PrometheusService implements CanalMetricsService {
             DefaultExports.initialize();
             // Canal server level exports
             CanalServerExports.initialize();
-            profiler().setInstanceProfilerFactory(new PrometheusClientInstanceProfilerFactory());
+            if (!clientProfiler.isStart()) {
+                clientProfiler.start();
+            }
+            profiler().setInstanceProfiler(clientProfiler);
         } catch (Throwable t) {
             logger.warn("Unable to initialize server exports.", t);
         }
@@ -63,13 +65,11 @@ public class PrometheusService implements CanalMetricsService {
     @Override
     public void terminate() {
         running = false;
-        // Normally, service should be terminated at canal shutdown.
-        // No need to unregister instance exports explicitly.
-        // But for the sake of safety, unregister them.
-        for (CanalInstanceExports cie : exports.values()) {
-            cie.unregister();
+        instanceExports.unregister();
+        if (clientProfiler.isStart()) {
+            clientProfiler.stop();
         }
-        profiler().setInstanceProfilerFactory(DISABLED);
+        profiler().setInstanceProfiler(NOP);
         if (server != null) {
             server.stop();
         }
@@ -111,4 +111,5 @@ public class PrometheusService implements CanalMetricsService {
         }
         logger.info("Unregister metrics for destination {}.", instance.getDestination());
     }
+
 }

+ 76 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/EntryCollector.java

@@ -0,0 +1,76 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+import com.alibaba.otter.canal.prometheus.InstanceRegistry;
+import com.alibaba.otter.canal.sink.CanalEventSink;
+import com.alibaba.otter.canal.sink.entry.EntryEventSink;
+import com.google.common.base.Preconditions;
+import io.prometheus.client.Collector;
+import io.prometheus.client.GaugeMetricFamily;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABEL_LIST;
+
+/**
+ * @author Chuanyi Li
+ */
+public class EntryCollector extends Collector implements InstanceRegistry {
+
+    private static final Logger                             logger              = LoggerFactory.getLogger(SinkCollector.class);
+    private static final String                             DELAY_NAME          = "canal_instance_traffic_delay";
+    private static final String                             delayHelpName       = "Traffic delay of canal instance";
+    private final ConcurrentMap<String, EntryMetricsHolder> instances           = new ConcurrentHashMap<String, EntryMetricsHolder>();
+
+    @Override
+    public List<MetricFamilySamples> collect() {
+        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
+        GaugeMetricFamily delay = new GaugeMetricFamily(DELAY_NAME,
+                delayHelpName, DEST_LABEL_LIST);
+        for (EntryMetricsHolder emh : instances.values()) {
+            delay.addMetric(emh.destLabelValues, emh.latestExecTime.doubleValue());
+        }
+        mfs.add(delay);
+        return mfs;
+    }
+
+    @Override
+    public void register(CanalInstance instance) {
+        final String destination = instance.getDestination();
+        EntryMetricsHolder holder = new EntryMetricsHolder();
+        holder.destLabelValues = Collections.singletonList(destination);
+        CanalEventSink sink = instance.getEventSink();
+        if (!(sink instanceof EntryEventSink)) {
+            throw new IllegalArgumentException("CanalEventSink must be EntryEventSink");
+        }
+        EntryEventSink entrySink = (EntryEventSink) sink;
+        PrometheusCanalEventDownStreamHandler handler = new PrometheusCanalEventDownStreamHandler();
+        holder.latestExecTime = handler.getLatestExecuteTime();
+        entrySink.addHandler(handler, 0);
+        Preconditions.checkNotNull(holder.destLabelValues);
+        Preconditions.checkNotNull(holder.latestExecTime);
+        EntryMetricsHolder old = instances.putIfAbsent(destination, holder);
+        if (old != null) {
+            logger.warn("Ignore repeated EntryCollector register for instance {}.", destination);
+        }
+    }
+
+    @Override
+    public void unregister(CanalInstance instance) {
+        final String destination = instance.getDestination();
+        instances.remove(destination);
+    }
+
+    private class EntryMetricsHolder {
+        private AtomicLong   latestExecTime;
+        private List<String> destLabelValues;
+    }
+
+}

+ 0 - 45
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/EntrySinkCollector.java

@@ -1,45 +0,0 @@
-package com.alibaba.otter.canal.prometheus.impl;
-
-import com.alibaba.otter.canal.sink.CanalEventSink;
-import com.alibaba.otter.canal.sink.entry.EntryEventSink;
-import io.prometheus.client.Collector;
-import io.prometheus.client.CounterMetricFamily;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABEL_LIST;
-
-/**
- * @author Chuanyi Li
- */
-public class EntrySinkCollector extends Collector {
-
-    private static final String       SINK_BLOCKING_TIME      = "canal_instance_sink_blocking_time";
-    private final String              sinkBlockTimeHelp;
-    private final List<String>        destLabelValues;
-    private final AtomicLong          eventsSinkBlockingTime;
-
-
-    public EntrySinkCollector(CanalEventSink sink, String destination) {
-        if (!(sink instanceof EntryEventSink)) {
-            throw new IllegalArgumentException("CanalEventSink must be EntryEventSink");
-        }
-        this.destLabelValues = Collections.singletonList(destination);
-        EntryEventSink entrySink = (EntryEventSink) sink;
-        this.sinkBlockTimeHelp = "Sink blocking time of instance " + destination;
-        eventsSinkBlockingTime = entrySink.getEventsSinkBlockingTime();
-    }
-
-    @Override public List<MetricFamilySamples> collect() {
-        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
-        CounterMetricFamily blockingCounter = new CounterMetricFamily(SINK_BLOCKING_TIME,
-                sinkBlockTimeHelp, DEST_LABEL_LIST);
-        blockingCounter.addMetric(destLabelValues, eventsSinkBlockingTime.doubleValue());
-        mfs.add(blockingCounter);
-        return mfs;
-    }
-
-}

+ 0 - 64
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/InstanceMetaCollector.java

@@ -1,64 +0,0 @@
-package com.alibaba.otter.canal.prometheus.impl;
-
-import com.alibaba.otter.canal.instance.core.CanalInstance;
-import com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring;
-import com.alibaba.otter.canal.meta.CanalMetaManager;
-import com.alibaba.otter.canal.prometheus.CanalInstanceExports;
-import com.alibaba.otter.canal.protocol.ClientIdentity;
-import io.prometheus.client.Collector;
-import io.prometheus.client.GaugeMetricFamily;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * @author Chuanyi Li
- */
-public class InstanceMetaCollector extends Collector {
-
-    private static final List<String> InfoLabel    = Arrays.asList("destination", "mode");
-
-    private CanalMetaManager          metaManager;
-
-    private final String              destination;
-
-    private final String              mode;
-
-    private final String              subsHelp;
-
-    public InstanceMetaCollector(CanalInstance instance) {
-        if (instance == null) {
-            throw new IllegalArgumentException("CanalInstance must not be null.");
-        }
-        if (instance instanceof CanalInstanceWithSpring) {
-            mode = "spring";
-        } else {
-            mode = "manager";
-        }
-        this.metaManager = instance.getMetaManager();
-        this.destination = instance.getDestination();
-        this.subsHelp = "Subscriptions of canal instance " + destination;
-    }
-
-    @Override
-    public List<MetricFamilySamples> collect() {
-        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
-        GaugeMetricFamily instanceInfo = new GaugeMetricFamily(
-                "canal_instance",
-                "Canal instance",
-                InfoLabel);
-        instanceInfo.addMetric(Arrays.asList(destination, mode), 1);
-        mfs.add(instanceInfo);
-        if (metaManager.isStart()) {
-            // client id = hardcode 1001, 目前没有意义
-            List<ClientIdentity> subs = metaManager.listAllSubscribeInfo(destination);
-            GaugeMetricFamily subscriptions = new GaugeMetricFamily(
-                    "canal_instance_subscription",
-                    subsHelp, CanalInstanceExports.DEST_LABEL_LIST);
-            subscriptions.addMetric(Arrays.asList(destination), subs.size());
-            mfs.add(subscriptions);
-        }
-        return mfs;
-    }
-}

+ 77 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MetaCollector.java

@@ -0,0 +1,77 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+import com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring;
+import com.alibaba.otter.canal.meta.CanalMetaManager;
+import com.alibaba.otter.canal.prometheus.InstanceRegistry;
+import com.alibaba.otter.canal.protocol.ClientIdentity;
+import com.google.common.base.Preconditions;
+import io.prometheus.client.Collector;
+import io.prometheus.client.GaugeMetricFamily;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * @author Chuanyi Li
+ */
+public class MetaCollector extends Collector implements InstanceRegistry {
+
+    private static final List<String>                      InfoLabel        = Arrays.asList("destination", "mode");
+    private static final Logger                            logger           = LoggerFactory.getLogger(MetaCollector.class);
+    private static final String                            SUBSCRIPTION     = "canal_instance";
+    private static final String                            subscriptionHelp = "Canal instance";
+    private final ConcurrentMap<String, MetaMetricsHolder> instances        = new ConcurrentHashMap<String, MetaMetricsHolder>();
+
+    @Override
+    public List<MetricFamilySamples> collect() {
+        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
+        GaugeMetricFamily instanceInfo = new GaugeMetricFamily(
+                SUBSCRIPTION,
+                subscriptionHelp,
+                InfoLabel);
+        for (Map.Entry<String, MetaMetricsHolder> nme : instances.entrySet()) {
+            final String destination = nme.getKey();
+            final MetaMetricsHolder nmh = nme.getValue();
+            List<ClientIdentity> subs = nmh.metaManager.listAllSubscribeInfo(destination);
+            int count = subs == null ? 0 : subs.size();
+            instanceInfo.addMetric(nmh.infoLabelValues, count);
+        }
+        mfs.add(instanceInfo);
+        return mfs;
+    }
+
+    @Override
+    public void register(CanalInstance instance) {
+        final String destination = instance.getDestination();
+        MetaMetricsHolder holder = new MetaMetricsHolder();
+        String mode = (instance instanceof CanalInstanceWithSpring) ? "spring" : "manager";
+        holder.infoLabelValues = Arrays.asList(destination, mode);
+        holder.metaManager = instance.getMetaManager();
+        Preconditions.checkNotNull(holder.infoLabelValues);
+        Preconditions.checkNotNull(holder.metaManager);
+        MetaMetricsHolder old = instances.putIfAbsent(destination, holder);
+        if (old != null) {
+            logger.warn("Ignore repeated MetaCollector register for instance {}.", destination);
+        }
+    }
+
+    @Override
+    public void unregister(CanalInstance instance) {
+        final String destination = instance.getDestination();
+        instances.remove(destination);
+    }
+
+    private class MetaMetricsHolder {
+        private List<String>     infoLabelValues;
+        private CanalMetaManager metaManager;
+    }
+
+
+}

+ 7 - 43
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java

@@ -1,53 +1,16 @@
 package com.alibaba.otter.canal.prometheus.impl;
 
-import com.alibaba.otter.canal.prometheus.CanalInstanceExports;
 import com.alibaba.otter.canal.sink.AbstractCanalEventDownStreamHandler;
 import com.alibaba.otter.canal.store.model.Event;
-import io.prometheus.client.Collector;
-import io.prometheus.client.GaugeMetricFamily;
-
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * @author Chuanyi Li
  */
 public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDownStreamHandler<List<Event>> {
 
-    private final Collector     collector;
-
-    private long                latestExecuteTime = 0L;
-
-    private static final String DELAY_NAME        = "canal_instance_traffic_delay";
-
-    private final String        delayHelpName;
-
-    private final List<String>  labelValues;
-
-    public PrometheusCanalEventDownStreamHandler(final String destination) {
-        this.delayHelpName = "Traffic delay of canal instance " + destination + " in seconds.";
-        this.labelValues = Collections.singletonList(destination);
-        collector = new Collector() {
-            @Override
-            public List<MetricFamilySamples> collect() {
-                List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
-                long now = System.currentTimeMillis();
-                GaugeMetricFamily delay = new GaugeMetricFamily(
-                        DELAY_NAME,
-                        delayHelpName,
-                        CanalInstanceExports.DEST_LABEL_LIST);
-                double d = 0.0;
-                if (latestExecuteTime > 0) {
-                    d = now - latestExecuteTime;
-                }
-                d = d > 0.0 ? (d / 1000) : 0.0;
-                delay.addMetric(labelValues, d);
-                mfs.add(delay);
-                return mfs;
-            }
-        };
-    }
+    private final AtomicLong    latestExecuteTime = new AtomicLong(0L);
 
     @Override
     public List<Event> before(List<Event> events) {
@@ -59,15 +22,16 @@ public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDow
         if (events != null && !events.isEmpty()) {
             Event last = events.get(events.size() - 1);
             long ts = last.getExecuteTime();
-            if (ts > latestExecuteTime) {
-                latestExecuteTime = ts;
+            long ls = latestExecuteTime.get();
+            if (ts > ls) {
+                latestExecuteTime.lazySet(ts);
             }
         }
         return events;
     }
 
-    public Collector getCollector() {
-        return this.collector;
+    public AtomicLong getLatestExecuteTime() {
+        return latestExecuteTime;
     }
 
 }

+ 21 - 26
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfiler.java

@@ -20,68 +20,60 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
     private static final String EMPTY_BATCHES  = "canal_instance_client_empty_batches";
     private static final String ERRORS         = "canal_instance_client_request_error";
     private static final String LATENCY        = "canal_instance_client_request_latency";
+    private final Counter       outboundCounter;
+    private final Counter       packetsCounter;
+    private final Counter       emptyBatchesCounter;
+    private final Counter       errorsCounter;
+    private final Histogram     responseLatency;
     private volatile boolean    running        = false;
-    private final String       destination;
-    private final String[]     destLabelValues;
-    private final Counter      outboundCounter;
-    private final Counter      packetsCounter;
-    private final Counter      emptyBatchesCounter;
-    private final Counter      errorsCounter;
-    private final Histogram    responseLatency;
 
-    public PrometheusClientInstanceProfiler(String destination) {
-        this.destination = destination;
-        this.destLabelValues = new String[]{destination};
+    public PrometheusClientInstanceProfiler() {
         this.outboundCounter = Counter.build()
                 .labelNames(DEST_LABELS)
                 .name(OUTBOUND_BYTES)
-                .help("Send bytes to client of instance " + destination)
+                .help("Total bytes sent to client.")
                 .create();
         this.packetsCounter = Counter.build()
                 .labelNames(new String[]{DEST, "packetType"})
                 .name(PACKET_TYPE)
-                .help("Send packets to client of instance " + destination)
+                .help("Total packets sent to client.")
                 .create();
         this.emptyBatchesCounter = Counter.build()
                 .labelNames(DEST_LABELS)
                 .name(EMPTY_BATCHES)
-                .help("Send empty batches to client of instance " + destination)
+                .help("Total empty batches sent to client.")
                 .create();
         this.errorsCounter = Counter.build()
                 .labelNames(new String[]{DEST, "errorCode"})
                 .name(ERRORS)
-                .help("Client request errors of instance " + destination)
+                .help("Total client request errors.")
                 .create();
         this.responseLatency = Histogram.build()
                 .labelNames(DEST_LABELS)
                 .name(LATENCY)
-                .help("Client request latency of instance " + destination)
+                .help("Client request latency.")
                 // buckets in milliseconds
                 .buckets(1.0, 2.5, 5.0, 10.0, 25.0, 50.0, 100.0)
                 .create();
     }
 
-    @Override
-    public String getDestination() {
-        return this.destination;
-    }
-
     @Override
     public void profiling(ClientRequestResult result) {
+        String destination = result.getDestination();
         PacketType type = result.getType();
-        outboundCounter.labels(destLabelValues).inc(result.getAmount());
+        outboundCounter.labels(destination).inc(result.getAmount());
         packetsCounter.labels(destination, type.name()).inc();
         short errorCode = result.getErrorCode();
         if (errorCode > 0) {
             errorsCounter.labels(destination, Short.toString(errorCode)).inc();
         }
         long latency = result.getLatency();
-        responseLatency.labels(destLabelValues).observe(latency / 1000000);
+        responseLatency.labels(destination).observe(latency / 1000000);
         switch (type) {
             case GET:
                 boolean empty = result.getEmpty();
                 if (empty) {
-                    emptyBatchesCounter.labels(destLabelValues).inc();
+                    emptyBatchesCounter.labels(destination).inc();
                 }
                 break;
             // reserve for others
@@ -90,7 +82,8 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
         }
     }
 
-    @Override public void start() {
+    @Override
+    public void start() {
         if (outboundCounter != null) {
             outboundCounter.register();
         }
@@ -109,7 +102,8 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
         running = true;
     }
 
-    @Override public void stop() {
+    @Override
+    public void stop() {
         running = false;
         if (outboundCounter != null) {
             CollectorRegistry.defaultRegistry.unregister(outboundCounter);
@@ -128,7 +122,8 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
         }
     }
 
-    @Override public boolean isStart() {
+    @Override
+    public boolean isStart() {
         return running;
     }
 }

+ 0 - 15
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfilerFactory.java

@@ -1,15 +0,0 @@
-package com.alibaba.otter.canal.prometheus.impl;
-
-import com.alibaba.otter.canal.server.netty.ClientInstanceProfiler;
-import com.alibaba.otter.canal.server.netty.ClientInstanceProfilerFactory;
-
-/**
- * @author Chuanyi Li
- */
-public class PrometheusClientInstanceProfilerFactory implements ClientInstanceProfilerFactory {
-
-    @Override
-    public ClientInstanceProfiler create(String destination) {
-        return new PrometheusClientInstanceProfiler(destination);
-    }
-}

+ 73 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/SinkCollector.java

@@ -0,0 +1,73 @@
+package com.alibaba.otter.canal.prometheus.impl;
+
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+import com.alibaba.otter.canal.prometheus.InstanceRegistry;
+import com.alibaba.otter.canal.sink.CanalEventSink;
+import com.alibaba.otter.canal.sink.entry.EntryEventSink;
+import com.google.common.base.Preconditions;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABEL_LIST;
+
+/**
+ * @author Chuanyi Li
+ */
+public class SinkCollector extends Collector implements InstanceRegistry {
+
+    private static final Logger                            logger             = LoggerFactory.getLogger(SinkCollector.class);
+    private static final String                            SINK_BLOCKING_TIME = "canal_instance_sink_blocking_time";
+    private static final String                            sinkBlockTimeHelp  = "Total sink blocking time";
+    private final ConcurrentMap<String, SinkMetricsHolder> instances          = new ConcurrentHashMap<String, SinkMetricsHolder>();
+
+    @Override
+    public List<MetricFamilySamples> collect() {
+        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
+        CounterMetricFamily blockingCounter = new CounterMetricFamily(SINK_BLOCKING_TIME,
+                sinkBlockTimeHelp, DEST_LABEL_LIST);
+        for (SinkMetricsHolder smh : instances.values()) {
+            blockingCounter.addMetric(smh.destLabelValues, smh.eventsSinkBlockingTime.doubleValue());
+        }
+        mfs.add(blockingCounter);
+        return mfs;
+    }
+
+    @Override
+    public void register(CanalInstance instance) {
+        final String destination = instance.getDestination();
+        SinkMetricsHolder holder = new SinkMetricsHolder();
+        holder.destLabelValues = Collections.singletonList(destination);
+        CanalEventSink sink = instance.getEventSink();
+        if (!(sink instanceof EntryEventSink)) {
+            throw new IllegalArgumentException("CanalEventSink must be EntryEventSink");
+        }
+        EntryEventSink entrySink = (EntryEventSink) sink;
+        holder.eventsSinkBlockingTime = entrySink.getEventsSinkBlockingTime();
+        Preconditions.checkNotNull(holder.destLabelValues);
+        Preconditions.checkNotNull(holder.eventsSinkBlockingTime);
+        SinkMetricsHolder old = instances.putIfAbsent(destination, holder);
+        if (old != null) {
+            logger.warn("Ignore repeated SinkCollector register for instance {}.", destination);
+        }
+    }
+
+    @Override
+    public void unregister(CanalInstance instance) {
+        final String destination = instance.getDestination();
+        instances.remove(destination);
+    }
+
+    private class SinkMetricsHolder {
+        private AtomicLong   eventsSinkBlockingTime;
+        private List<String> destLabelValues;
+    }
+}

+ 24 - 9
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MemoryStoreCollector.java → prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/StoreCollector.java

@@ -1,6 +1,8 @@
 package com.alibaba.otter.canal.prometheus.impl;
 
+import com.alibaba.otter.canal.instance.core.CanalInstance;
 import com.alibaba.otter.canal.prometheus.CanalInstanceExports;
+import com.alibaba.otter.canal.prometheus.InstanceRegistry;
 import com.alibaba.otter.canal.store.CanalEventStore;
 import com.alibaba.otter.canal.store.CanalStoreException;
 import com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer;
@@ -12,21 +14,24 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * @author Chuanyi Li
  */
-public class MemoryStoreCollector extends Collector {
+public class StoreCollector extends Collector implements InstanceRegistry {
 
     private static final Class<MemoryEventStoreWithBuffer> clazz  = MemoryEventStoreWithBuffer.class;
-    private final String                                   destination;
-    private final AtomicLong                               putSequence;
-    private final AtomicLong                               ackSequence;
-    private final String                                   putHelp;
-    private final String                                   ackHelp;
+    private static final String                             PRODUCE     = "canal_instance_store_produce_seq";
+    private static final String                             CONSUME     = "canal_instance_store_consume_seq";
+    private static final String                             produceHelp = "Produced sequence of canal instance";
+    private static final String                             consumeHelp = "Consumed sequence of canal instance";
+    private final ConcurrentMap<String, StoreMetricsHolder> instances   = new ConcurrentHashMap<String, StoreMetricsHolder>();
 
-    public MemoryStoreCollector(CanalEventStore store, String destination) {
+
+    public StoreCollector(CanalEventStore store, String destination) {
         this.destination = destination;
         if (!(store instanceof MemoryEventStoreWithBuffer)) {
             throw new IllegalArgumentException("EventStore must be MemoryEventStoreWithBuffer");
@@ -34,8 +39,6 @@ public class MemoryStoreCollector extends Collector {
         MemoryEventStoreWithBuffer ms = (MemoryEventStoreWithBuffer) store;
         putSequence = getDeclaredValue(ms, "putSequence");
         ackSequence = getDeclaredValue(ms, "ackSequence");
-        putHelp = "Produced sequence of canal instance " + destination;
-        ackHelp = "Consumed sequence of canal instance " + destination;
     }
 
     @Override
@@ -67,4 +70,16 @@ public class MemoryStoreCollector extends Collector {
         return value;
     }
 
+    @Override public void register(CanalInstance instance) {
+
+    }
+
+    @Override public void unregister(CanalInstance instance) {
+
+    }
+
+    private class StoreMetricsHolder {
+        private AtomicLong putSeq;
+        private AtomicLong ackSeq;
+    }
 }

+ 10 - 76
server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNettyProfiler.java

@@ -1,105 +1,39 @@
 package com.alibaba.otter.canal.server.netty;
 
-import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
+import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.server.netty.listener.ChannelFutureAggregator.ClientRequestResult;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 /**
  * @author Chuanyi Li
  */
 public class CanalServerWithNettyProfiler {
 
-    public static final ClientInstanceProfilerFactory           DISABLED = new DefaultClientInstanceProfilerFactory();
-    private volatile ClientInstanceProfilerFactory              clientInstanceProfilerFactory;
-    private final ConcurrentMap<String, ClientInstanceProfiler> cliPfs;
-    private final CanalServerWithEmbedded                       server;
+    public static final ClientInstanceProfiler NOP               = new DefaultClientInstanceProfiler();
+    private ClientInstanceProfiler             instanceProfiler;
 
     private static class SingletonHolder {
         private static CanalServerWithNettyProfiler SINGLETON = new CanalServerWithNettyProfiler();
     }
 
     private CanalServerWithNettyProfiler() {
-        this.clientInstanceProfilerFactory = DISABLED;
-        this.cliPfs = new ConcurrentHashMap<String, ClientInstanceProfiler>();
-        this.server = CanalServerWithEmbedded.instance();
+        this.instanceProfiler = NOP;
     }
 
     public static CanalServerWithNettyProfiler profiler() {
         return SingletonHolder.SINGLETON;
     }
 
-    public void profiling(String destination, ClientRequestResult result) {
-        if (isDisabled()) {
-            return;
-        }
-        ClientInstanceProfiler profiler = cliPfs.get(destination);
-        if (profiler != null) {
-            profiler.profiling(result);
-        }
+    public void profiling(ClientRequestResult result) {
+        instanceProfiler.profiling(result);
     }
 
-    public void start(String destination) {
-        if (isDisabled()) {
-            return;
-        }
-        if (server.isStart(destination)) {
-            throw new IllegalStateException("Instance profiler should not be start while running.");
-        }
-        ClientInstanceProfiler profiler = clientInstanceProfilerFactory.create(destination);
-        profiler.start();
-        cliPfs.put(destination, profiler);
+    public void setInstanceProfiler(ClientInstanceProfiler instanceProfiler) {
+        this.instanceProfiler = instanceProfiler;
     }
 
-    /**
-     * Remove instance profiler for specified instance.
-     * Only accepted while instance is not running.
-     * @param destination canal instance destination
-     */
-    public void stop(String destination) {
-        if (isDisabled()) {
-            return;
-        }
-        if (server.isStart(destination)) {
-            throw new IllegalStateException("Instance profiler should not be stop while running.");
-        }
-        ClientInstanceProfiler profiler = cliPfs.remove(destination);
-        if (profiler != null && profiler.isStart()) {
-            profiler.stop();
-        }
-    }
-
-    public void setInstanceProfilerFactory(ClientInstanceProfilerFactory factory) {
-        this.clientInstanceProfilerFactory = factory;
-    }
-
-    private boolean isDisabled() {
-        return clientInstanceProfilerFactory == DISABLED || clientInstanceProfilerFactory == null;
-    }
-
-    private ClientInstanceProfiler tryGet(String destination) {
-        //try fast get
-        ClientInstanceProfiler profiler = cliPfs.get(destination);
-        if (profiler == null) {
-            synchronized (cliPfs) {
-                if (server.isStart(destination)) {
-                    // avoid overwriting
-                    cliPfs.putIfAbsent(destination, clientInstanceProfilerFactory.create(destination));
-                    profiler = cliPfs.get(destination);
-                    if (!profiler.isStart()) {
-                        profiler.start();
-                    }
-                }
-            }
-        }
-        return profiler;
-    }
-
-    private static class DefaultClientInstanceProfilerFactory implements ClientInstanceProfilerFactory {
+    private static class DefaultClientInstanceProfiler extends AbstractCanalLifeCycle implements ClientInstanceProfiler {
         @Override
-        public ClientInstanceProfiler create(String destination) {
-            throw new UnsupportedOperationException();
-        }
+        public void profiling(ClientRequestResult result) {}
     }
 
 }

+ 2 - 4
server/src/main/java/com/alibaba/otter/canal/server/netty/ClientInstanceProfiler.java

@@ -1,15 +1,13 @@
 package com.alibaba.otter.canal.server.netty;
 
 import com.alibaba.otter.canal.common.CanalLifeCycle;
-import com.alibaba.otter.canal.server.netty.listener.ChannelFutureAggregator;
+import com.alibaba.otter.canal.server.netty.listener.ChannelFutureAggregator.ClientRequestResult;
 
 /**
  * @author Chuanyi Li
  */
 public interface ClientInstanceProfiler extends CanalLifeCycle {
 
-    String getDestination();
-
-    void profiling(ChannelFutureAggregator.ClientRequestResult result);
+    void profiling(ClientRequestResult result);
 
 }

+ 0 - 10
server/src/main/java/com/alibaba/otter/canal/server/netty/ClientInstanceProfilerFactory.java

@@ -1,10 +0,0 @@
-package com.alibaba.otter.canal.server.netty;
-
-/**
- * @author Chuanyi Li
- */
-public interface ClientInstanceProfilerFactory {
-
-    ClientInstanceProfiler create(String destination);
-
-}

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/listener/ChannelFutureAggregator.java

@@ -45,7 +45,7 @@ public class ChannelFutureAggregator implements ChannelFutureListener {
         if (future.getCause() != null) {
             result.channelError = future.getCause();
         }
-        profiler().profiling(result.destination, result);
+        profiler().profiling(result);
     }
 
     /**