ソースを参照

Merge pull request #7 from alibaba/master

merge
rewerma 6 年 前
コミット
4e1675033a

+ 0 - 1
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -323,7 +323,6 @@ public class SimpleCanalConnector implements CanalConnector {
         Packet p = Packet.parseFrom(data);
         switch (p.getType()) {
             case MESSAGES: {
-                // if (!p.getCompression().equals(Compression.NONE)) {
                 if (!p.getCompression().equals(Compression.NONE)
                     && !p.getCompression().equals(Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
                     throw new CanalClientException("compression is not supported in this connector");

+ 11 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java

@@ -5,6 +5,7 @@ import java.util.Map;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
 
 /**
@@ -23,6 +24,8 @@ public final class LogContext {
 
     private GTIDSet                           gtidSet;
 
+    private GtidLogEvent                      gtidLogEvent; // save current gtid log event
+
     public LogContext(){
         this.formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;
     }
@@ -71,4 +74,12 @@ public final class LogContext {
     public void setGtidSet(GTIDSet gtidSet) {
         this.gtidSet = gtidSet;
     }
+
+    public GtidLogEvent getGtidLogEvent() {
+        return gtidLogEvent;
+    }
+
+    public void setGtidLogEvent(GtidLogEvent gtidLogEvent) {
+        this.gtidLogEvent = gtidLogEvent;
+    }
 }

+ 13 - 11
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -161,19 +161,20 @@ public final class LogDecoder {
             buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
         }
         GTIDSet gtidSet = context.getGtidSet();
+        GtidLogEvent gtidLogEvent = context.getGtidLogEvent();
         switch (header.getType()) {
             case LogEvent.QUERY_EVENT: {
                 QueryLogEvent event = new QueryLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.XID_EVENT: {
                 XidLogEvent event = new XidLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.TABLE_MAP_EVENT: {
@@ -271,14 +272,14 @@ public final class LogDecoder {
                 RandLogEvent event = new RandLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.USER_VAR_EVENT: {
                 UserVarLogEvent event = new UserVarLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.FORMAT_DESCRIPTION_EVENT: {
@@ -329,7 +330,6 @@ public final class LogDecoder {
                 HeartbeatLogEvent event = new HeartbeatLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
                 return event;
             }
             case LogEvent.IGNORABLE_LOG_EVENT: {
@@ -342,7 +342,7 @@ public final class LogDecoder {
                 RowsQueryLogEvent event = new RowsQueryLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.WRITE_ROWS_EVENT: {
@@ -350,7 +350,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.UPDATE_ROWS_EVENT: {
@@ -358,7 +358,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.DELETE_ROWS_EVENT: {
@@ -366,7 +366,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.GTID_LOG_EVENT:
@@ -377,8 +377,10 @@ public final class LogDecoder {
                 if (gtidSet != null) {
                     gtidSet.update(event.getGtidStr());
                     // update latest gtid
-                    header.putGtidStr(gtidSet);
+                    header.putGtid(gtidSet, event);
                 }
+                // update current gtid event to context
+                context.setGtidLogEvent(event);
                 return event;
             }
             case LogEvent.PREVIOUS_GTIDS_LOG_EVENT: {
@@ -409,7 +411,7 @@ public final class LogDecoder {
                 AnnotateRowsEvent event = new AnnotateRowsEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.BINLOG_CHECKPOINT_EVENT: {

+ 30 - 5
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/LogHeader.java

@@ -4,6 +4,9 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * The Common-Header, documented in the table @ref Table_common_header "below",
  * always has the same form and length within one version of MySQL. Each event
@@ -121,7 +124,12 @@ public final class LogHeader {
      */
     protected String    logFileName;
 
-    protected String    gtidStr;
+    protected Map<String, String> gtidMap = new HashMap<>();
+
+    private static final String CURRENT_GTID_STRING = "curt_gtid";
+    private static final String CURRENT_GTID_SN = "curt_gtid_sn";
+    private static final String CURRENT_GTID_LAST_COMMIT = "curt_gtid_lct";
+    private static final String GTID_SET_STRING = "gtid_str";
 
     /* for Start_event_v3 */
     public LogHeader(final int type){
@@ -292,13 +300,30 @@ public final class LogHeader {
         }
     }
 
-    public String getGtidStr() {
-        return gtidStr;
+    public String getGtidSetStr() {
+        return gtidMap.get(GTID_SET_STRING);
     }
 
-    public void putGtidStr(GTIDSet gtidSet) {
+    public String getCurrentGtid() {
+        return gtidMap.get(CURRENT_GTID_STRING);
+    }
+
+    public String getCurrentGtidSn() {
+        return gtidMap.get(CURRENT_GTID_SN);
+    }
+
+    public String getCurrentGtidLastCommit() {
+        return gtidMap.get(CURRENT_GTID_LAST_COMMIT);
+    }
+
+    public void putGtid(GTIDSet gtidSet, GtidLogEvent event) {
         if (gtidSet != null) {
-            this.gtidStr = gtidSet.toString();
+            gtidMap.put(GTID_SET_STRING, gtidSet.toString());
+            if (event != null) {
+                gtidMap.put(CURRENT_GTID_STRING, event.getGtidStr());
+                gtidMap.put(CURRENT_GTID_SN, String.valueOf(event.getSequenceNumber()));
+                gtidMap.put(CURRENT_GTID_LAST_COMMIT, String.valueOf(event.getLastCommitted()));
+            }
         }
     }
 }

+ 180 - 109
deployer/src/main/resources/metrics/Canal_instances_tmpl.json

@@ -1,4 +1,34 @@
 {
+  "__inputs": [
+    {
+      "name": "DS_PROMETHEUS",
+      "label": "prometheus",
+      "description": "",
+      "type": "datasource",
+      "pluginId": "prometheus",
+      "pluginName": "Prometheus"
+    }
+  ],
+  "__requires": [
+    {
+      "type": "grafana",
+      "id": "grafana",
+      "name": "Grafana",
+      "version": "5.2.2"
+    },
+    {
+      "type": "panel",
+      "id": "graph",
+      "name": "Graph",
+      "version": "5.0.0"
+    },
+    {
+      "type": "datasource",
+      "id": "prometheus",
+      "name": "Prometheus",
+      "version": "5.0.0"
+    }
+  ],
   "annotations": {
     "list": [
       {
@@ -15,8 +45,8 @@
   "editable": true,
   "gnetId": null,
   "graphTooltip": 0,
-  "id": 8,
-  "iteration": 1534740332233,
+  "id": null,
+  "iteration": 1536989235272,
   "links": [],
   "panels": [
     {
@@ -189,7 +219,7 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "rate(canal_instance_received_binlog_bytes{destination=~\"$destination\"}[2m]) / 1024",
+          "expr": "rate(canal_instance_received_binlog_bytes{destination=~\"$destination\", parser=\"0\"}[2m]) / 1024",
           "format": "time_series",
           "interval": "15s",
           "intervalFactor": 2,
@@ -203,6 +233,22 @@
           "intervalFactor": 2,
           "legendFormat": "outbound",
           "refId": "B"
+        },
+        {
+          "expr": "rate(canal_instance_received_binlog_bytes{destination=~\"$destination\", parser=\"1\"}[2m]) / 1024",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "inbound-1",
+          "refId": "C"
+        },
+        {
+          "expr": "rate(canal_instance_received_binlog_bytes{destination=~\"$destination\", parser=\"2\"}[2m]) / 1024",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "inbound-2",
+          "refId": "D"
         }
       ],
       "thresholds": [],
@@ -399,20 +445,36 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "clamp_max(rate(canal_instance_sink_blocking_time{destination=~\"$destination\"}[2m]), 1000) / 10",
+          "expr": "clamp_max(rate(canal_instance_publish_blocking_time{destination=~\"$destination\", parser=\"0\"}[2m]), 1000) / 10",
           "format": "time_series",
           "interval": "15s",
           "intervalFactor": 2,
           "legendFormat": "dump",
-          "refId": "A"
+          "refId": "B"
         },
         {
-          "expr": "clamp_max(rate(canal_instance_publish_blocking_time{destination=~\"$destination\"}[2m]), 1000) / 10",
+          "expr": "clamp_max(rate(canal_instance_sink_blocking_time{destination=~\"$destination\"}[2m]), 1000) / 10",
           "format": "time_series",
           "interval": "15s",
           "intervalFactor": 2,
           "legendFormat": "sink",
-          "refId": "B"
+          "refId": "A"
+        },
+        {
+          "expr": "clamp_max(rate(canal_instance_publish_blocking_time{destination=~\"$destination\", parser=\"1\"}[2m]), 1000) / 10",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "dump-1",
+          "refId": "C"
+        },
+        {
+          "expr": "clamp_max(rate(canal_instance_publish_blocking_time{destination=~\"$destination\", parser=\"2\"}[2m]), 1000) / 10",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "dump-2",
+          "refId": "D"
         }
       ],
       "thresholds": [],
@@ -465,7 +527,7 @@
       },
       "id": 32,
       "panels": [],
-      "title": "Server",
+      "title": "Throughput",
       "type": "row"
     },
     {
@@ -660,21 +722,34 @@
         "alignLevel": null
       }
     },
+    {
+      "collapsed": false,
+      "gridPos": {
+        "h": 1,
+        "w": 24,
+        "x": 0,
+        "y": 12
+      },
+      "id": 34,
+      "panels": [],
+      "title": "Client",
+      "type": "row"
+    },
     {
       "aliasColors": {},
       "bars": false,
       "dashLength": 10,
       "dashes": false,
       "datasource": "$datasource",
-      "description": "Canal instance ringbuffer内未释放的events数量。",
+      "description": "Canal instance接收到的请求统计,结果按packet type分类。",
       "fill": 1,
       "gridPos": {
         "h": 5,
         "w": 6,
-        "x": 12,
-        "y": 7
+        "x": 0,
+        "y": 13
       },
-      "id": 20,
+      "id": 16,
       "legend": {
         "avg": false,
         "current": false,
@@ -698,18 +773,18 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "canal_instance_store_produce_seq{destination=~\"$destination\"} - canal_instance_store_consume_seq{destination=~\"$destination\"}",
+          "expr": "canal_instance_client_packets{destination=~\"$destination\"}",
           "format": "time_series",
           "interval": "15s",
           "intervalFactor": 2,
-          "legendFormat": "events",
+          "legendFormat": "{{packetType}}",
           "refId": "A"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Store remain events",
+      "title": "Client requests",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -725,16 +800,14 @@
       },
       "yaxes": [
         {
-          "decimals": null,
           "format": "none",
-          "label": "",
+          "label": null,
           "logBase": 1,
           "max": null,
           "min": null,
           "show": true
         },
         {
-          "decimals": null,
           "format": "short",
           "label": null,
           "logBase": 1,
@@ -753,16 +826,16 @@
       "bars": false,
       "dashLength": 10,
       "dashes": false,
-      "datasource": "$datasource",
-      "description": "Canal instance ringbuffer 内未释放events占用内存。",
+      "datasource": "${DS_PROMETHEUS}",
+      "description": "client 请求的GET与ACK包的QPS。",
       "fill": 1,
       "gridPos": {
         "h": 5,
         "w": 6,
-        "x": 18,
-        "y": 7
+        "x": 6,
+        "y": 13
       },
-      "id": 22,
+      "id": 38,
       "legend": {
         "avg": false,
         "current": false,
@@ -786,18 +859,26 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "(canal_instance_store_produce_mem{destination=~\"$destination\"} - canal_instance_store_consume_mem{destination=~\"$destination\"}) / 1024",
+          "expr": "rate(canal_instance_client_packets{destination=~\"$destination\",packetType=\"GET\"}[2m])",
           "format": "time_series",
           "interval": "15s",
           "intervalFactor": 2,
-          "legendFormat": "memsize",
+          "legendFormat": "GET",
           "refId": "A"
+        },
+        {
+          "expr": "rate(canal_instance_client_packets{destination=~\"$destination\",packetType=\"CLIENTACK\"}[2m])",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "ACK",
+          "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Store remain mem",
+      "title": "Client QPS",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -813,7 +894,7 @@
       },
       "yaxes": [
         {
-          "format": "deckbytes",
+          "format": "short",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -834,34 +915,21 @@
         "alignLevel": null
       }
     },
-    {
-      "collapsed": false,
-      "gridPos": {
-        "h": 1,
-        "w": 24,
-        "x": 0,
-        "y": 12
-      },
-      "id": 34,
-      "panels": [],
-      "title": "Client",
-      "type": "row"
-    },
     {
       "aliasColors": {},
       "bars": false,
       "dashLength": 10,
       "dashes": false,
       "datasource": "$datasource",
-      "description": "Canal instance接收到的请求统计,结果按packet type分类。",
+      "description": "server响应GET请求,但返回空包的占比。",
       "fill": 1,
       "gridPos": {
         "h": 5,
         "w": 6,
-        "x": 0,
+        "x": 12,
         "y": 13
       },
-      "id": 16,
+      "id": 26,
       "legend": {
         "avg": false,
         "current": false,
@@ -885,18 +953,25 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "canal_instance_client_packets{destination=~\"$destination\"}",
+          "expr": "rate(canal_instance_client_empty_batches{destination=~\"$destination\"}[2m])",
           "format": "time_series",
           "interval": "15s",
           "intervalFactor": 2,
-          "legendFormat": "{{packetType}}",
+          "legendFormat": "empty",
           "refId": "A"
+        },
+        {
+          "expr": "rate(canal_instance_client_packets{destination=~\"$destination\", packetType=\"GET\"}[2m])",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "legendFormat": "nonempty",
+          "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Client requests",
+      "title": "Empty packets",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -912,7 +987,7 @@
       },
       "yaxes": [
         {
-          "format": "none",
+          "format": "wps",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -939,20 +1014,22 @@
       "dashLength": 10,
       "dashes": false,
       "datasource": "$datasource",
-      "description": "client 请求的GET与ACK包的QPS。",
+      "description": "Canal client 请求响应时间的概况。",
       "fill": 1,
       "gridPos": {
         "h": 5,
         "w": 6,
-        "x": 6,
+        "x": 18,
         "y": 13
       },
-      "id": 38,
+      "id": 18,
       "legend": {
+        "alignAsTable": false,
         "avg": false,
         "current": false,
         "max": false,
         "min": false,
+        "rightSide": false,
         "show": true,
         "total": false,
         "values": false
@@ -965,37 +1042,40 @@
       "pointradius": 5,
       "points": false,
       "renderer": "flot",
-      "seriesOverrides": [],
+      "seriesOverrides": [
+        {
+          "alias": "25.0",
+          "yaxis": 1
+        },
+        {
+          "alias": "100.0",
+          "yaxis": 1
+        }
+      ],
       "spaceLength": 10,
       "stack": false,
       "steppedLine": false,
       "targets": [
         {
-          "expr": "rate(canal_instance_client_packets{destination=~\"$destination\",packetType=\"GET\"}[2m])",
+          "expr": "rate(canal_instance_client_request_latency_bucket{destination=~\"$destination\"}[2m])",
           "format": "time_series",
+          "hide": false,
           "interval": "15s",
           "intervalFactor": 2,
-          "legendFormat": "GET",
+          "legendFormat": "{{le}}ms",
           "refId": "A"
-        },
-        {
-          "expr": "rate(canal_instance_client_packets{destination=~\"$destination\",packetType=\"CLIENTACK\"}[2m])",
-          "format": "time_series",
-          "interval": "15s",
-          "intervalFactor": 2,
-          "legendFormat": "ACK",
-          "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Client QPS",
+      "title": "Response time",
       "tooltip": {
         "shared": true,
         "sort": 0,
         "value_type": "individual"
       },
+      "transparent": false,
       "type": "graph",
       "xaxis": {
         "buckets": null,
@@ -1027,21 +1107,34 @@
         "alignLevel": null
       }
     },
+    {
+      "collapsed": false,
+      "gridPos": {
+        "h": 1,
+        "w": 24,
+        "x": 0,
+        "y": 18
+      },
+      "id": 36,
+      "panels": [],
+      "title": "Store",
+      "type": "row"
+    },
     {
       "aliasColors": {},
       "bars": false,
       "dashLength": 10,
       "dashes": false,
       "datasource": "$datasource",
-      "description": "server响应GET请求,但返回空包的占比。",
+      "description": "Canal instance ringbuffer内未释放的events数量。",
       "fill": 1,
       "gridPos": {
         "h": 5,
         "w": 6,
-        "x": 12,
-        "y": 13
+        "x": 0,
+        "y": 19
       },
-      "id": 26,
+      "id": 20,
       "legend": {
         "avg": false,
         "current": false,
@@ -1065,25 +1158,18 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "rate(canal_instance_client_empty_batches{destination=~\"$destination\"}[2m])",
+          "expr": "canal_instance_store_produce_seq{destination=~\"$destination\"} - canal_instance_store_consume_seq{destination=~\"$destination\"}",
           "format": "time_series",
           "interval": "15s",
           "intervalFactor": 2,
-          "legendFormat": "empty",
+          "legendFormat": "events",
           "refId": "A"
-        },
-        {
-          "expr": "rate(canal_instance_client_packets{destination=~\"$destination\", packetType=\"GET\"}[2m])",
-          "format": "time_series",
-          "intervalFactor": 1,
-          "legendFormat": "nonempty",
-          "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Empty packets",
+      "title": "Store remain events",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -1099,14 +1185,16 @@
       },
       "yaxes": [
         {
-          "format": "wps",
-          "label": null,
+          "decimals": null,
+          "format": "none",
+          "label": "",
           "logBase": 1,
           "max": null,
           "min": null,
           "show": true
         },
         {
+          "decimals": null,
           "format": "short",
           "label": null,
           "logBase": 1,
@@ -1126,22 +1214,20 @@
       "dashLength": 10,
       "dashes": false,
       "datasource": "$datasource",
-      "description": "Canal client 请求响应时间的概况。",
+      "description": "Canal instance ringbuffer 内未释放events占用内存。",
       "fill": 1,
       "gridPos": {
         "h": 5,
         "w": 6,
-        "x": 18,
-        "y": 13
+        "x": 6,
+        "y": 19
       },
-      "id": 18,
+      "id": 22,
       "legend": {
-        "alignAsTable": false,
         "avg": false,
         "current": false,
         "max": false,
         "min": false,
-        "rightSide": false,
         "show": true,
         "total": false,
         "values": false
@@ -1154,40 +1240,29 @@
       "pointradius": 5,
       "points": false,
       "renderer": "flot",
-      "seriesOverrides": [
-        {
-          "alias": "25.0",
-          "yaxis": 1
-        },
-        {
-          "alias": "100.0",
-          "yaxis": 1
-        }
-      ],
+      "seriesOverrides": [],
       "spaceLength": 10,
       "stack": false,
       "steppedLine": false,
       "targets": [
         {
-          "expr": "rate(canal_instance_client_request_latency_bucket{destination=~\"$destination\"}[2m])",
+          "expr": "(canal_instance_store_produce_mem{destination=~\"$destination\"} - canal_instance_store_consume_mem{destination=~\"$destination\"}) / 1024",
           "format": "time_series",
-          "hide": false,
           "interval": "15s",
           "intervalFactor": 2,
-          "legendFormat": "{{le}}ms",
+          "legendFormat": "memsize",
           "refId": "A"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Response time",
+      "title": "Store remain mem",
       "tooltip": {
         "shared": true,
         "sort": 0,
         "value_type": "individual"
       },
-      "transparent": false,
       "type": "graph",
       "xaxis": {
         "buckets": null,
@@ -1198,7 +1273,7 @@
       },
       "yaxes": [
         {
-          "format": "short",
+          "format": "deckbytes",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -1230,9 +1305,8 @@
     "list": [
       {
         "current": {
-          "tags": [],
-          "text": "Prometheus",
-          "value": "Prometheus"
+          "text": "prometheus",
+          "value": "prometheus"
         },
         "hide": 0,
         "label": "datasource",
@@ -1245,10 +1319,7 @@
       },
       {
         "allValue": null,
-        "current": {
-          "text": "example",
-          "value": "example"
-        },
+        "current": {},
         "datasource": "$datasource",
         "hide": 0,
         "includeAll": false,
@@ -1269,7 +1340,7 @@
     ]
   },
   "time": {
-    "from": "now-1h",
+    "from": "now-6h",
     "to": "now"
   },
   "timepicker": {
@@ -1300,5 +1371,5 @@
   "timezone": "",
   "title": "Canal instances",
   "uid": "8vh8NGpiz",
-  "version": 1
+  "version": 103
 }

+ 52 - 0
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -5,6 +5,7 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
 
+import com.alibaba.otter.canal.protocol.CanalEntry;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.SystemUtils;
 import org.slf4j.Logger;
@@ -299,4 +300,55 @@ public class AbstractCanalClientTest {
         this.connector = connector;
     }
 
+    /**
+     * 获取当前Entry的 GTID信息示例
+     * @param header
+     * @return
+     */
+    public static String getCurrentGtid(CanalEntry.Header header) {
+        List<CanalEntry.Pair> props = header.getPropsList();
+        if (props != null && props.size() > 0) {
+            for (CanalEntry.Pair pair : props) {
+                if ("curtGtid".equals(pair.getKey())) {
+                    return pair.getValue();
+                }
+            }
+        }
+        return "";
+    }
+
+    /**
+     * 获取当前Entry的 GTID Sequence No信息示例
+     * @param header
+     * @return
+     */
+    public static String getCurrentGtidSn(CanalEntry.Header header) {
+        List<CanalEntry.Pair> props = header.getPropsList();
+        if (props != null && props.size() > 0) {
+            for (CanalEntry.Pair pair : props) {
+                if ("curtGtidSn".equals(pair.getKey())) {
+                    return pair.getValue();
+                }
+            }
+        }
+        return "";
+    }
+
+    /**
+     * 获取当前Entry的 GTID Last Committed信息示例
+     * @param header
+     * @return
+     */
+    public static String getCurrentGtidLct(CanalEntry.Header header) {
+        List<CanalEntry.Pair> props = header.getPropsList();
+        if (props != null && props.size() > 0) {
+            for (CanalEntry.Pair pair : props) {
+                if ("curtGtidLct".equals(pair.getKey())) {
+                    return pair.getValue();
+                }
+            }
+        }
+        return "";
+    }
+
 }

+ 6 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -38,6 +38,8 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     protected boolean              filterRows                = false;
     protected boolean              filterTableError          = false;
     protected boolean              useDruidDdlFilter         = true;
+    // instance received binlog bytes
+    protected final AtomicLong     receivedBinlogBytes       = new AtomicLong(0L);
     private final AtomicLong       eventsPublishBlockingTime = new AtomicLong(0L);
 
     protected BinlogParser buildParser() {
@@ -204,4 +206,8 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         return this.eventsPublishBlockingTime;
     }
 
+    public AtomicLong getReceivedBinlogBytes() {
+        return this.receivedBinlogBytes;
+    }
+
 }

+ 0 - 7
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -71,9 +71,6 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private int                  dumpErrorCount                    = 0;                 // binlogDump失败异常计数
     private int                  dumpErrorCountThreshold           = 2;                 // binlogDump失败异常计数阀值
 
-    // instance received binlog bytes
-    private final AtomicLong     receivedBinlogBytes               = new AtomicLong(0L);
-
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
     }
@@ -907,8 +904,4 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         this.dumpErrorCountThreshold = dumpErrorCountThreshold;
     }
 
-    public AtomicLong getReceivedBinlogBytes() {
-        return this.receivedBinlogBytes;
-    }
-
 }

+ 18 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -545,7 +545,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 rowsCount++;
                 rowChangeBuider.addRowDatas(rowDataBuilder.build());
             }
-
             TableMapLogEvent table = event.getTable();
             Header header = createHeader(event.getHeader(),
                 table.getDbName(),
@@ -834,8 +833,24 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         }
         headerBuilder.setEventLength(logHeader.getEventLen());
         // enable gtid position
-        if (StringUtils.isNotEmpty(logHeader.getGtidStr())) {
-            headerBuilder.setGtid(logHeader.getGtidStr());
+        if (StringUtils.isNotEmpty(logHeader.getGtidSetStr())) {
+            headerBuilder.setGtid(logHeader.getGtidSetStr());
+        }
+        // add current gtid
+        if (StringUtils.isNotEmpty(logHeader.getCurrentGtid())) {
+            Pair pair = createSpecialPair("curtGtid", logHeader.getCurrentGtid());
+            headerBuilder.addProps(pair);
+        }
+        // add current gtid sequence no
+        if (StringUtils.isNotEmpty(logHeader.getCurrentGtidSn())) {
+            Pair pair = createSpecialPair("curtGtidSn", logHeader.getCurrentGtidSn());
+            headerBuilder.addProps(pair);
+        }
+
+        // add current gtid last committed
+        if (StringUtils.isNotEmpty(logHeader.getCurrentGtidLastCommit())) {
+            Pair pair = createSpecialPair("curtGtidLct", logHeader.getCurrentGtidLastCommit());
+            headerBuilder.addProps(pair);
         }
 
         // add rowsCount suppport

+ 69 - 27
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/ParserCollector.java

@@ -1,14 +1,15 @@
 package com.alibaba.otter.canal.prometheus.impl;
 
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
-import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
+
+import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
+import com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser;
 import io.prometheus.client.Collector;
 import io.prometheus.client.CounterMetricFamily;
 import io.prometheus.client.GaugeMetricFamily;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -19,7 +20,6 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.instance.core.CanalInstance;
 import com.alibaba.otter.canal.parse.CanalEventParser;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
 import com.alibaba.otter.canal.prometheus.InstanceRegistry;
 import com.google.common.base.Preconditions;
 
@@ -34,11 +34,13 @@ public class ParserCollector extends Collector implements InstanceRegistry {
     private static final String                              RECEIVED_BINLOG       = "canal_instance_received_binlog_bytes";
     private static final String                              PARSER_MODE           = "canal_instance_parser_mode";
     private static final String                              MODE_LABEL            = "parallel";
+    private static final String                              PARSER_LABEL          = "parser";
     private static final String                              PUBLISH_BLOCKING_HELP = "Publish blocking time of dump thread in milliseconds";
     private static final String                              RECEIVED_BINLOG_HELP  = "Received binlog bytes";
     private static final String                              MODE_HELP             = "Parser mode(parallel/serial) of instance";
     private final List<String>                               modeLabels            = Arrays.asList(DEST, MODE_LABEL);
-    private final ConcurrentMap<String, ParserMetricsHolder> instances             = new ConcurrentHashMap<String, ParserMetricsHolder>();
+    private final List<String>                               parserLabels          = Arrays.asList(DEST, PARSER_LABEL);
+    private final ConcurrentMap<String, ParserMetricsHolder> instances             = new ConcurrentHashMap<>();
 
     private ParserCollector() {}
 
@@ -52,53 +54,87 @@ public class ParserCollector extends Collector implements InstanceRegistry {
 
     @Override
     public List<MetricFamilySamples> collect() {
-        List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
-        boolean hasParallel = false;
+        List<MetricFamilySamples> mfs = new ArrayList<>();
         CounterMetricFamily bytesCounter = new CounterMetricFamily(RECEIVED_BINLOG,
-                RECEIVED_BINLOG_HELP, DEST_LABELS_LIST);
+                RECEIVED_BINLOG_HELP, parserLabels);
         GaugeMetricFamily modeGauge = new GaugeMetricFamily(PARSER_MODE,
                 MODE_HELP, modeLabels);
         CounterMetricFamily blockingCounter = new CounterMetricFamily(PUBLISH_BLOCKING,
-                PUBLISH_BLOCKING_HELP, DEST_LABELS_LIST);
+                PUBLISH_BLOCKING_HELP, parserLabels);
         for (ParserMetricsHolder emh : instances.values()) {
-            if (emh.isParallel) {
-                blockingCounter.addMetric(emh.destLabelValues, (emh.eventsPublishBlockingTime.doubleValue() / NANO_PER_MILLI));
-                hasParallel = true;
+            if (emh instanceof GroupParserMetricsHolder) {
+                GroupParserMetricsHolder group = (GroupParserMetricsHolder) emh;
+                for (ParserMetricsHolder semh :  group.holders) {
+                    singleCollect(bytesCounter, blockingCounter, modeGauge, semh);
+                }
+            }
+            else {
+                singleCollect(bytesCounter, blockingCounter, modeGauge, emh);
             }
-            modeGauge.addMetric(emh.modeLabelValues, 1);
-            bytesCounter.addMetric(emh.destLabelValues, emh.receivedBinlogBytes.doubleValue());
-
         }
         mfs.add(bytesCounter);
         mfs.add(modeGauge);
-        if (hasParallel) {
+        if (!blockingCounter.samples.isEmpty()) {
             mfs.add(blockingCounter);
         }
         return mfs;
     }
 
+    private void singleCollect(CounterMetricFamily bytesCounter, CounterMetricFamily blockingCounter, GaugeMetricFamily modeGauge, ParserMetricsHolder holder) {
+        if (holder.isParallel) {
+            blockingCounter.addMetric(holder.parserLabelValues, (holder.eventsPublishBlockingTime.doubleValue() / NANO_PER_MILLI));
+        }
+        modeGauge.addMetric(holder.modeLabelValues, 1);
+        bytesCounter.addMetric(holder.parserLabelValues, holder.receivedBinlogBytes.doubleValue());
+    }
+
     @Override
     public void register(CanalInstance instance) {
         final String destination = instance.getDestination();
-        ParserMetricsHolder holder = new ParserMetricsHolder();
+        ParserMetricsHolder holder;
         CanalEventParser parser = instance.getEventParser();
-        if (!(parser instanceof MysqlEventParser)) {
-            throw new IllegalArgumentException("CanalEventParser must be MysqlEventParser");
+        if (parser instanceof AbstractMysqlEventParser) {
+            holder = singleHolder(destination, (AbstractMysqlEventParser)parser, "0");
+        } else if (parser instanceof GroupEventParser) {
+            holder = groupHolder(destination, (GroupEventParser)parser);
+        } else {
+            throw new IllegalArgumentException("CanalEventParser must be either AbstractMysqlEventParser or GroupEventParser.");
         }
-        MysqlEventParser mysqlParser = (MysqlEventParser) parser;
-        holder.destLabelValues = Collections.singletonList(destination);
-        holder.modeLabelValues = Arrays.asList(destination, Boolean.toString(mysqlParser.isParallel()));
-        holder.eventsPublishBlockingTime = mysqlParser.getEventsPublishBlockingTime();
-        holder.receivedBinlogBytes = mysqlParser.getReceivedBinlogBytes();
-        holder.isParallel = mysqlParser.isParallel();
-        Preconditions.checkNotNull(holder.eventsPublishBlockingTime);
-        Preconditions.checkNotNull(holder.receivedBinlogBytes);
+        Preconditions.checkNotNull(holder);
         ParserMetricsHolder old = instances.put(destination, holder);
         if (old != null) {
             logger.warn("Remove stale ParserCollector for instance {}.", destination);
         }
     }
 
+    private ParserMetricsHolder singleHolder(String destination, AbstractMysqlEventParser parser, String id) {
+        ParserMetricsHolder holder = new ParserMetricsHolder();
+        holder.parserLabelValues = Arrays.asList(destination, id);
+        holder.modeLabelValues = Arrays.asList(destination, Boolean.toString(parser.isParallel()));
+        holder.eventsPublishBlockingTime = parser.getEventsPublishBlockingTime();
+        holder.receivedBinlogBytes = parser.getReceivedBinlogBytes();
+        holder.isParallel = parser.isParallel();
+        Preconditions.checkNotNull(holder.eventsPublishBlockingTime);
+        Preconditions.checkNotNull(holder.receivedBinlogBytes);
+        return holder;
+    }
+
+    private GroupParserMetricsHolder groupHolder(String destination, GroupEventParser group) {
+        List<CanalEventParser> parsers = group.getEventParsers();
+        GroupParserMetricsHolder groupHolder = new GroupParserMetricsHolder();
+        int num = parsers.size();
+        for (int i = 0; i < num; i ++) {
+            CanalEventParser parser = parsers.get(i);
+            if (parser instanceof AbstractMysqlEventParser) {
+                ParserMetricsHolder single = singleHolder(destination, (AbstractMysqlEventParser)parser, Integer.toString(i + 1));
+                groupHolder.holders.add(single);
+            } else {
+                logger.warn("Null or non AbstractMysqlEventParser, ignore.");
+            }
+        }
+        return groupHolder;
+    }
+
     @Override
     public void unregister(CanalInstance instance) {
         final String destination = instance.getDestination();
@@ -106,11 +142,17 @@ public class ParserCollector extends Collector implements InstanceRegistry {
     }
 
     private class ParserMetricsHolder {
-        private List<String> destLabelValues;
+        private List<String> parserLabelValues;
         private List<String> modeLabelValues;
+        // metrics for single parser
         private AtomicLong   receivedBinlogBytes;
         private AtomicLong   eventsPublishBlockingTime;
+        // parser mode
         private boolean      isParallel;
     }
 
+    private class GroupParserMetricsHolder extends ParserMetricsHolder {
+        private final List<ParserMetricsHolder> holders = new ArrayList<>();
+    }
+
 }