Browse Source

Canal metrics group mode support.

Chuanyi Li 6 years ago
parent
commit
6682c56b9f

+ 6 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -38,6 +38,8 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     protected boolean              filterRows                = false;
     protected boolean              filterTableError          = false;
     protected boolean              useDruidDdlFilter         = true;
+    // instance received binlog bytes
+    protected final AtomicLong     receivedBinlogBytes       = new AtomicLong(0L);
     private final AtomicLong       eventsPublishBlockingTime = new AtomicLong(0L);
 
     protected BinlogParser buildParser() {
@@ -204,4 +206,8 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         return this.eventsPublishBlockingTime;
     }
 
+    public AtomicLong getReceivedBinlogBytes() {
+        return this.receivedBinlogBytes;
+    }
+
 }

+ 0 - 7
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -71,9 +71,6 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private int                  dumpErrorCount                    = 0;                 // binlogDump失败异常计数
     private int                  dumpErrorCountThreshold           = 2;                 // binlogDump失败异常计数阀值
 
-    // instance received binlog bytes
-    private final AtomicLong     receivedBinlogBytes               = new AtomicLong(0L);
-
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
     }
@@ -907,8 +904,4 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         this.dumpErrorCountThreshold = dumpErrorCountThreshold;
     }
 
-    public AtomicLong getReceivedBinlogBytes() {
-        return this.receivedBinlogBytes;
-    }
-
 }

+ 69 - 27
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/ParserCollector.java

@@ -1,14 +1,15 @@
 package com.alibaba.otter.canal.prometheus.impl;
 
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
-import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
+
+import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
+import com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser;
 import io.prometheus.client.Collector;
 import io.prometheus.client.CounterMetricFamily;
 import io.prometheus.client.GaugeMetricFamily;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -19,7 +20,6 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.instance.core.CanalInstance;
 import com.alibaba.otter.canal.parse.CanalEventParser;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
 import com.alibaba.otter.canal.prometheus.InstanceRegistry;
 import com.google.common.base.Preconditions;
 
@@ -34,11 +34,13 @@ public class ParserCollector extends Collector implements InstanceRegistry {
     private static final String                              RECEIVED_BINLOG       = "canal_instance_received_binlog_bytes";
     private static final String                              PARSER_MODE           = "canal_instance_parser_mode";
     private static final String                              MODE_LABEL            = "parallel";
+    private static final String                              PARSER_LABEL          = "parser";
     private static final String                              PUBLISH_BLOCKING_HELP = "Publish blocking time of dump thread in milliseconds";
     private static final String                              RECEIVED_BINLOG_HELP  = "Received binlog bytes";
     private static final String                              MODE_HELP             = "Parser mode(parallel/serial) of instance";
     private final List<String>                               modeLabels            = Arrays.asList(DEST, MODE_LABEL);
-    private final ConcurrentMap<String, ParserMetricsHolder> instances             = new ConcurrentHashMap<String, ParserMetricsHolder>();
+    private final List<String>                               parserLabels          = Arrays.asList(DEST, PARSER_LABEL);
+    private final ConcurrentMap<String, ParserMetricsHolder> instances             = new ConcurrentHashMap<>();
 
     private ParserCollector() {}
 
@@ -52,53 +54,87 @@ public class ParserCollector extends Collector implements InstanceRegistry {
 
     @Override
     public List<MetricFamilySamples> collect() {
-        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
-        boolean hasParallel = false;
+        List<MetricFamilySamples> mfs = new ArrayList<>();
         CounterMetricFamily bytesCounter = new CounterMetricFamily(RECEIVED_BINLOG,
-                RECEIVED_BINLOG_HELP, DEST_LABELS_LIST);
+                RECEIVED_BINLOG_HELP, parserLabels);
         GaugeMetricFamily modeGauge = new GaugeMetricFamily(PARSER_MODE,
                 MODE_HELP, modeLabels);
         CounterMetricFamily blockingCounter = new CounterMetricFamily(PUBLISH_BLOCKING,
-                PUBLISH_BLOCKING_HELP, DEST_LABELS_LIST);
+                PUBLISH_BLOCKING_HELP, parserLabels);
         for (ParserMetricsHolder emh : instances.values()) {
-            if (emh.isParallel) {
-                blockingCounter.addMetric(emh.destLabelValues, (emh.eventsPublishBlockingTime.doubleValue() / NANO_PER_MILLI));
-                hasParallel = true;
+            if (emh instanceof GroupParserMetricsHolder) {
+                GroupParserMetricsHolder group = (GroupParserMetricsHolder) emh;
+                for (ParserMetricsHolder semh :  group.holders) {
+                    singleCollect(bytesCounter, blockingCounter, modeGauge, semh);
+                }
+            }
+            else {
+                singleCollect(bytesCounter, blockingCounter, modeGauge, emh);
             }
-            modeGauge.addMetric(emh.modeLabelValues, 1);
-            bytesCounter.addMetric(emh.destLabelValues, emh.receivedBinlogBytes.doubleValue());
-
         }
         mfs.add(bytesCounter);
         mfs.add(modeGauge);
-        if (hasParallel) {
+        if (!blockingCounter.samples.isEmpty()) {
             mfs.add(blockingCounter);
         }
         return mfs;
     }
 
+    private void singleCollect(CounterMetricFamily bytesCounter, CounterMetricFamily blockingCounter, GaugeMetricFamily modeGauge, ParserMetricsHolder holder) {
+        if (holder.isParallel) {
+            blockingCounter.addMetric(holder.parserLabelValues, (holder.eventsPublishBlockingTime.doubleValue() / NANO_PER_MILLI));
+        }
+        modeGauge.addMetric(holder.modeLabelValues, 1);
+        bytesCounter.addMetric(holder.parserLabelValues, holder.receivedBinlogBytes.doubleValue());
+    }
+
     @Override
     public void register(CanalInstance instance) {
         final String destination = instance.getDestination();
-        ParserMetricsHolder holder = new ParserMetricsHolder();
+        ParserMetricsHolder holder;
         CanalEventParser parser = instance.getEventParser();
-        if (!(parser instanceof MysqlEventParser)) {
-            throw new IllegalArgumentException("CanalEventParser must be MysqlEventParser");
+        if (parser instanceof AbstractMysqlEventParser) {
+            holder = singleHolder(destination, (AbstractMysqlEventParser)parser, "0");
+        } else if (parser instanceof GroupEventParser) {
+            holder = groupHolder(destination, (GroupEventParser)parser);
+        } else {
+            throw new IllegalArgumentException("CanalEventParser must be either AbstractMysqlEventParser or GroupEventParser.");
         }
-        MysqlEventParser mysqlParser = (MysqlEventParser) parser;
-        holder.destLabelValues = Collections.singletonList(destination);
-        holder.modeLabelValues = Arrays.asList(destination, Boolean.toString(mysqlParser.isParallel()));
-        holder.eventsPublishBlockingTime = mysqlParser.getEventsPublishBlockingTime();
-        holder.receivedBinlogBytes = mysqlParser.getReceivedBinlogBytes();
-        holder.isParallel = mysqlParser.isParallel();
-        Preconditions.checkNotNull(holder.eventsPublishBlockingTime);
-        Preconditions.checkNotNull(holder.receivedBinlogBytes);
+        Preconditions.checkNotNull(holder);
         ParserMetricsHolder old = instances.put(destination, holder);
         if (old != null) {
             logger.warn("Remove stale ParserCollector for instance {}.", destination);
         }
     }
 
+    private ParserMetricsHolder singleHolder(String destination, AbstractMysqlEventParser parser, String id) {
+        ParserMetricsHolder holder = new ParserMetricsHolder();
+        holder.parserLabelValues = Arrays.asList(destination, id);
+        holder.modeLabelValues = Arrays.asList(destination, Boolean.toString(parser.isParallel()));
+        holder.eventsPublishBlockingTime = parser.getEventsPublishBlockingTime();
+        holder.receivedBinlogBytes = parser.getReceivedBinlogBytes();
+        holder.isParallel = parser.isParallel();
+        Preconditions.checkNotNull(holder.eventsPublishBlockingTime);
+        Preconditions.checkNotNull(holder.receivedBinlogBytes);
+        return holder;
+    }
+
+    private GroupParserMetricsHolder groupHolder(String destination, GroupEventParser group) {
+        List<CanalEventParser> parsers = group.getEventParsers();
+        GroupParserMetricsHolder groupHolder = new GroupParserMetricsHolder();
+        int num = parsers.size();
+        for (int i = 0; i < num; i ++) {
+            CanalEventParser parser = parsers.get(i);
+            if (parser instanceof AbstractMysqlEventParser) {
+                ParserMetricsHolder single = singleHolder(destination, (AbstractMysqlEventParser)parser, Integer.toString(i + 1));
+                groupHolder.holders.add(single);
+            } else {
+                logger.warn("Null or non AbstractMysqlEventParser, ignore.");
+            }
+        }
+        return groupHolder;
+    }
+
     @Override
     public void unregister(CanalInstance instance) {
         final String destination = instance.getDestination();
@@ -106,11 +142,17 @@ public class ParserCollector extends Collector implements InstanceRegistry {
     }
 
     private class ParserMetricsHolder {
-        private List<String> destLabelValues;
+        private List<String> parserLabelValues;
         private List<String> modeLabelValues;
+        // metrics for single parser
         private AtomicLong   receivedBinlogBytes;
         private AtomicLong   eventsPublishBlockingTime;
+        // parser mode
         private boolean      isParallel;
     }
 
+    private class GroupParserMetricsHolder extends ParserMetricsHolder {
+        private final List<ParserMetricsHolder> holders = new ArrayList<>();
+    }
+
 }