Browse Source

Merge branch 'master' of https://github.com/lcybo/canal

Chuanyi Li L 6 years ago
parent
commit
8585761f42
37 changed files with 1496 additions and 35 deletions
  1. 1343 0
      deployer/src/main/resources/metrics/Canal_instances_tmpl.json
  2. BIN
      images/DML_model_indicator.PNG
  3. BIN
      images/QPS.PNG
  4. BIN
      images/QPS_empty_batches.PNG
  5. BIN
      images/RPS_client.PNG
  6. BIN
      images/TPS.PNG
  7. BIN
      images/TPS_event.PNG
  8. BIN
      images/TPS_rows.PNG
  9. BIN
      images/TPS_transaction.PNG
  10. BIN
      images/basic.PNG
  11. BIN
      images/client_band.PNG
  12. BIN
      images/clients.PNG
  13. BIN
      images/delay.PNG
  14. BIN
      images/empty.PNG
  15. BIN
      images/idle.PNG
  16. BIN
      images/instance.PNG
  17. BIN
      images/latency.PNG
  18. BIN
      images/network.PNG
  19. BIN
      images/overview.PNG
  20. BIN
      images/publish_blocking.PNG
  21. BIN
      images/remain_events.PNG
  22. BIN
      images/remain_mem.PNG
  23. BIN
      images/reqs.PNG
  24. BIN
      images/rows.PNG
  25. BIN
      images/sink_blocking.PNG
  26. BIN
      images/store.PNG
  27. BIN
      images/store_remain_events.PNG
  28. BIN
      images/store_remain_memory.PNG
  29. BIN
      images/throughput.PNG
  30. BIN
      images/transactions.PNG
  31. 0 19
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/EntryCollector.java
  32. 1 12
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java
  33. 4 1
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfiler.java
  34. 59 0
      prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/StoreCollector.java
  35. 2 0
      server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java
  36. 1 1
      server/src/main/java/com/alibaba/otter/canal/server/netty/listener/ChannelFutureAggregator.java
  37. 86 2
      store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

+ 1343 - 0
deployer/src/main/resources/metrics/Canal_instances_tmpl.json

@@ -0,0 +1,1343 @@
+{
+  "__inputs": [
+    {
+      "name": "DS_PROMETHEUS",
+      "label": "prometheus",
+      "description": "",
+      "type": "datasource",
+      "pluginId": "prometheus",
+      "pluginName": "Prometheus"
+    }
+  ],
+  "__requires": [
+    {
+      "type": "grafana",
+      "id": "grafana",
+      "name": "Grafana",
+      "version": "5.2.2"
+    },
+    {
+      "type": "panel",
+      "id": "graph",
+      "name": "Graph",
+      "version": "5.0.0"
+    },
+    {
+      "type": "datasource",
+      "id": "prometheus",
+      "name": "Prometheus",
+      "version": "5.0.0"
+    }
+  ],
+  "annotations": {
+    "list": [
+      {
+        "builtIn": 1,
+        "datasource": "-- Grafana --",
+        "enable": true,
+        "hide": true,
+        "iconColor": "rgba(0, 211, 255, 1)",
+        "name": "Annotations & Alerts",
+        "type": "dashboard"
+      }
+    ]
+  },
+  "editable": true,
+  "gnetId": null,
+  "graphTooltip": 0,
+  "id": null,
+  "iteration": 1534723196501,
+  "links": [],
+  "panels": [
+    {
+      "collapsed": false,
+      "gridPos": {
+        "h": 1,
+        "w": 24,
+        "x": 0,
+        "y": 0
+      },
+      "id": 30,
+      "panels": [],
+      "title": "Instance status",
+      "type": "row"
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "$datasource",
+      "description": "Canal instance 基本信息。",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 0,
+        "y": 1
+      },
+      "id": 24,
+      "legend": {
+        "alignAsTable": true,
+        "avg": false,
+        "current": false,
+        "hideEmpty": false,
+        "hideZero": false,
+        "max": false,
+        "min": false,
+        "rightSide": true,
+        "show": true,
+        "sideWidth": 500,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "repeat": null,
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "canal_instance{destination=~\"$destination\"}",
+          "format": "time_series",
+          "instant": true,
+          "intervalFactor": 1,
+          "legendFormat": "Destination: {{destination}}",
+          "refId": "A"
+        },
+        {
+          "expr": "canal_instance_parser_mode{destination=~\"$destination\"}",
+          "format": "time_series",
+          "instant": true,
+          "intervalFactor": 1,
+          "legendFormat": "Parallel parser: {{parallel}}",
+          "refId": "B"
+        },
+        {
+          "expr": "canal_instance_store{destination=~\"$destination\"}",
+          "format": "time_series",
+          "instant": true,
+          "intervalFactor": 1,
+          "legendFormat": "Batch mode: {{batchMode}}",
+          "refId": "C"
+        },
+        {
+          "expr": "canal_instance_store{destination=~\"$destination\"}",
+          "format": "time_series",
+          "instant": true,
+          "intervalFactor": 1,
+          "legendFormat": "Buffer size: {{size}}",
+          "refId": "D"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Basic",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "transparent": true,
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": false,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {
+        "inbound": "#bf1b00"
+      },
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "$datasource",
+      "description": "Canal instance 网络带宽占用。\ninbound: 读取MySQL binlog.\noutbound: 对Client端传输格式化binlog.",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 6,
+        "y": 1
+      },
+      "id": 6,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "rate(canal_instance_received_binlog_bytes{destination=~\"$destination\"}[2m]) / 1024",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "inbound",
+          "refId": "A"
+        },
+        {
+          "expr": "rate(canal_instance_client_bytes{destination=~\"$destination\"}[2m]) / 1024",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "outbound",
+          "refId": "B"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Network bandwith",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "KBs",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {
+        "ack": "#f29191",
+        "get": "#cca300",
+        "put": "#1f78c1"
+      },
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "$datasource",
+      "description": "master: Canal server相对于MySQL master的延时。通过master heartbeat机制能刷新idle状态下的延时。\nput: store put操作的时间点为基准。\nget: client get操作的时间点为基准。\nack:  client ack操作的时间点为基准。",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 12,
+        "y": 1
+      },
+      "id": 4,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "canal_instance_traffic_delay{destination=~\"$destination\"} / 1000",
+          "format": "time_series",
+          "hide": false,
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "master",
+          "refId": "D"
+        },
+        {
+          "expr": "canal_instance_put_delay{destination=~\"$destination\"} / 1000",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "put",
+          "refId": "A"
+        },
+        {
+          "expr": "canal_instance_get_delay{destination=~\"$destination\"} / 1000",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "get",
+          "refId": "B"
+        },
+        {
+          "expr": "canal_instance_ack_delay{destination=~\"$destination\"} / 1000",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "ack",
+          "refId": "C"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Delay",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "s",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "$datasource",
+      "description": "sink线程blocking占比;dump线程blocking占比(仅parallel mode)。",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 18,
+        "y": 1
+      },
+      "hideTimeOverride": false,
+      "id": 2,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "clamp_max(rate(canal_instance_sink_blocking_time{destination=~\"$destination\"}[2m]), 1000) / 10",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "dump",
+          "refId": "A"
+        },
+        {
+          "expr": "clamp_max(rate(canal_instance_publish_blocking_time{destination=~\"$destination\"}[2m]), 1000) / 10",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "sink",
+          "refId": "B"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Blocking",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "percent",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "collapsed": false,
+      "gridPos": {
+        "h": 1,
+        "w": 24,
+        "x": 0,
+        "y": 6
+      },
+      "id": 32,
+      "panels": [],
+      "title": "Throughput",
+      "type": "row"
+    },
+    {
+      "aliasColors": {
+        "rowDatas": "#7eb26d",
+        "tableRows": "#c15c17"
+      },
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "$datasource",
+      "description": "Instance处理binlog的TPS(以master变更行数table rows为基准计算)。\nput: put操作TPS。\nget: get操作TPS。\nack: ack操作TPS。",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 0,
+        "y": 7
+      },
+      "id": 14,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "rate(canal_instance_put_rows{destination=~\"$destination\"}[2m])",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "put",
+          "refId": "A"
+        },
+        {
+          "expr": "rate(canal_instance_get_rows{destination=~\"$destination\"}[2m])",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "get",
+          "refId": "B"
+        },
+        {
+          "expr": "rate(canal_instance_ack_rows{destination=~\"$destination\"}[2m])",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "legendFormat": "ack",
+          "refId": "C"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "TPS(table rows)",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "iops",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {
+        "transactions": "#f9ba8f"
+      },
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "$datasource",
+      "description": "Canal instance 处理binlog的TPS,以MySQL transaction为单位计算。",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 6,
+        "y": 7
+      },
+      "id": 12,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "rate(canal_instance_transactions{destination=~\"$destination\"}[2m])",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "transactions",
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "TPS(MySQL transaction)",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "iops",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "collapsed": false,
+      "gridPos": {
+        "h": 1,
+        "w": 24,
+        "x": 0,
+        "y": 12
+      },
+      "id": 34,
+      "panels": [],
+      "title": "Client",
+      "type": "row"
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "$datasource",
+      "description": "Canal instance接收到的请求统计,结果按packet type分类。",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 0,
+        "y": 13
+      },
+      "id": 16,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "canal_instance_client_packets{destination=~\"$destination\"}",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "{{packetType}}",
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Client requests",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "none",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_PROMETHEUS}",
+      "description": "client 请求的GET与ACK包的QPS。",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 6,
+        "y": 13
+      },
+      "id": 38,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "rate(canal_instance_client_packets{destination=~\"$destination\",packetType=\"GET\"}[2m])",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "GET",
+          "refId": "A"
+        },
+        {
+          "expr": "rate(canal_instance_client_packets{destination=~\"$destination\",packetType=\"CLIENTACK\"}[2m])",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "ACK",
+          "refId": "B"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Client QPS",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "$datasource",
+      "description": "server响应GET请求,但返回空包的占比。",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 12,
+        "y": 13
+      },
+      "id": 26,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "rate(canal_instance_client_empty_batches{destination=~\"$destination\"}[2m])",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "empty",
+          "refId": "A"
+        },
+        {
+          "expr": "rate(canal_instance_client_packets{destination=~\"$destination\", packetType=\"GET\"}[2m])",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "legendFormat": "nonempty",
+          "refId": "B"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Empty packets",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "wps",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "$datasource",
+      "description": "Canal client 请求响应时间的概况。",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 18,
+        "y": 13
+      },
+      "id": 18,
+      "legend": {
+        "alignAsTable": false,
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "rightSide": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [
+        {
+          "alias": "25.0",
+          "yaxis": 1
+        },
+        {
+          "alias": "100.0",
+          "yaxis": 1
+        }
+      ],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "rate(canal_instance_client_request_latency_bucket{destination=~\"$destination\"}[2m])",
+          "format": "time_series",
+          "hide": false,
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "{{le}}ms",
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Response time",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "transparent": false,
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "collapsed": false,
+      "gridPos": {
+        "h": 1,
+        "w": 24,
+        "x": 0,
+        "y": 18
+      },
+      "id": 36,
+      "panels": [],
+      "title": "Store",
+      "type": "row"
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "$datasource",
+      "description": "Canal instance ringbuffer内未释放的events数量。",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 0,
+        "y": 19
+      },
+      "id": 20,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "canal_instance_store_produce_seq{destination=~\"$destination\"} - canal_instance_store_consume_seq{destination=~\"$destination\"}",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "events",
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Store remain events",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "decimals": null,
+          "format": "none",
+          "label": "",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "decimals": null,
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "$datasource",
+      "description": "Canal instance ringbuffer 内未释放events占用内存。",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 6,
+        "y": 19
+      },
+      "id": 22,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "(canal_instance_store_produce_mem{destination=~\"$destination\"} - canal_instance_store_consume_mem{destination=~\"$destination\"}) / 1024",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "memsize",
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Store remain mem",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "deckbytes",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    }
+  ],
+  "refresh": false,
+  "schemaVersion": 16,
+  "style": "dark",
+  "tags": [
+    "canal"
+  ],
+  "templating": {
+    "list": [
+      {
+        "current": {
+          "text": "prometheus",
+          "value": "prometheus"
+        },
+        "hide": 0,
+        "label": "datasource",
+        "name": "datasource",
+        "options": [],
+        "query": "prometheus",
+        "refresh": 1,
+        "regex": "",
+        "type": "datasource"
+      },
+      {
+        "allValue": null,
+        "current": {},
+        "datasource": "$datasource",
+        "hide": 0,
+        "includeAll": false,
+        "label": "destination",
+        "multi": false,
+        "name": "destination",
+        "options": [],
+        "query": "label_values(canal_instance, destination)",
+        "refresh": 1,
+        "regex": "",
+        "sort": 0,
+        "tagValuesQuery": "",
+        "tags": [],
+        "tagsQuery": "",
+        "type": "query",
+        "useTags": false
+      }
+    ]
+  },
+  "time": {
+    "from": "now-6h",
+    "to": "now"
+  },
+  "timepicker": {
+    "refresh_intervals": [
+      "5s",
+      "10s",
+      "30s",
+      "1m",
+      "5m",
+      "15m",
+      "30m",
+      "1h",
+      "2h",
+      "1d"
+    ],
+    "time_options": [
+      "5m",
+      "15m",
+      "1h",
+      "6h",
+      "12h",
+      "24h",
+      "2d",
+      "7d",
+      "30d"
+    ]
+  },
+  "timezone": "",
+  "title": "Canal instances",
+  "uid": "8vh8NGpiz",
+  "version": 97
+}

BIN
images/DML_model_indicator.PNG


BIN
images/QPS.PNG


BIN
images/QPS_empty_batches.PNG


BIN
images/RPS_client.PNG


BIN
images/TPS.PNG


BIN
images/TPS_event.PNG


BIN
images/TPS_rows.PNG


BIN
images/TPS_transaction.PNG


BIN
images/basic.PNG


BIN
images/client_band.PNG


BIN
images/clients.PNG


BIN
images/delay.PNG


BIN
images/empty.PNG


BIN
images/idle.PNG


BIN
images/instance.PNG


BIN
images/latency.PNG


BIN
images/network.PNG


BIN
images/overview.PNG


BIN
images/publish_blocking.PNG


BIN
images/remain_events.PNG


BIN
images/remain_mem.PNG


BIN
images/reqs.PNG


BIN
images/rows.PNG


BIN
images/sink_blocking.PNG


BIN
images/store.PNG


BIN
images/store_remain_events.PNG


BIN
images/store_remain_memory.PNG


BIN
images/throughput.PNG


BIN
images/transactions.PNG


+ 0 - 19
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/EntryCollector.java

@@ -7,7 +7,6 @@ import com.alibaba.otter.canal.sink.CanalEventSink;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;
 import com.google.common.base.Preconditions;
 import io.prometheus.client.Collector;
-import io.prometheus.client.Counter;
 import io.prometheus.client.CounterMetricFamily;
 import io.prometheus.client.GaugeMetricFamily;
 import org.slf4j.Logger;
@@ -30,12 +29,8 @@ public class EntryCollector extends Collector implements InstanceRegistry {
     private static final Logger                             logger            = LoggerFactory.getLogger(SinkCollector.class);
     private static final String                             DELAY             = "canal_instance_traffic_delay";
     private static final String                             TRANSACTION       = "canal_instance_transactions";
-    private static final String                             ROW_EVENTS        = "canal_instance_row_events";
-    private static final String                             ROWS_COUNTER      = "canal_instance_rows_counter";
     private static final String                             DELAY_HELP        = "Traffic delay of canal instance in milliseconds";
     private static final String                             TRANSACTION_HELP  = "Transactions counter of canal instance";
-    private static final String                             ROW_EVENTS_HELP   = "Rowdata events counter of canal instance";
-    private static final String                             ROWS_COUNTER_HELP = "Rows counter of canal instance";
     private final ConcurrentMap<String, EntryMetricsHolder> instances        = new ConcurrentHashMap<String, EntryMetricsHolder>();
 
     private EntryCollector() {}
@@ -55,10 +50,6 @@ public class EntryCollector extends Collector implements InstanceRegistry {
                 DELAY_HELP, DEST_LABELS_LIST);
         CounterMetricFamily transactions = new CounterMetricFamily(TRANSACTION,
                 TRANSACTION_HELP, DEST_LABELS_LIST);
-        CounterMetricFamily rowEvents = new CounterMetricFamily(ROW_EVENTS,
-                ROW_EVENTS_HELP, DEST_LABELS_LIST);
-        CounterMetricFamily rowsCounter = new CounterMetricFamily(ROWS_COUNTER,
-                ROWS_COUNTER_HELP, DEST_LABELS_LIST);
         for (EntryMetricsHolder emh : instances.values()) {
             long now = System.currentTimeMillis();
             long latest = emh.latestExecTime.get();
@@ -66,13 +57,9 @@ public class EntryCollector extends Collector implements InstanceRegistry {
                 delay.addMetric(emh.destLabelValues, (now - latest));
             }
             transactions.addMetric(emh.destLabelValues, emh.transactionCounter.doubleValue());
-            rowEvents.addMetric(emh.destLabelValues, emh.rowEventCounter.doubleValue());
-            rowsCounter.addMetric(emh.destLabelValues, emh.rowsCounter.doubleValue());
         }
         mfs.add(delay);
         mfs.add(transactions);
-        mfs.add(rowEvents);
-        mfs.add(rowsCounter);
         return mfs;
     }
 
@@ -89,12 +76,8 @@ public class EntryCollector extends Collector implements InstanceRegistry {
         PrometheusCanalEventDownStreamHandler handler = assembleHandler(entrySink);
         holder.latestExecTime = handler.getLatestExecuteTime();
         holder.transactionCounter = handler.getTransactionCounter();
-        holder.rowEventCounter = handler.getRowEventCounter();
-        holder.rowsCounter = handler.getRowsCounter();
         Preconditions.checkNotNull(holder.latestExecTime);
         Preconditions.checkNotNull(holder.transactionCounter);
-        Preconditions.checkNotNull(holder.rowEventCounter);
-        Preconditions.checkNotNull(holder.rowsCounter);
         EntryMetricsHolder old = instances.put(destination, holder);
         if (old != null) {
             logger.warn("Remove stale EntryCollector for instance {}.", destination);
@@ -145,8 +128,6 @@ public class EntryCollector extends Collector implements InstanceRegistry {
     private class EntryMetricsHolder {
         private AtomicLong   latestExecTime;
         private AtomicLong   transactionCounter;
-        private AtomicLong   rowEventCounter;
-        private AtomicLong   rowsCounter;
         private List<String> destLabelValues;
     }
 

+ 1 - 12
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java

@@ -13,10 +13,8 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDownStreamHandler<List<Event>> {
 
-    private final AtomicLong latestExecuteTime  = new AtomicLong(0L);
+    private final AtomicLong latestExecuteTime  = new AtomicLong(System.currentTimeMillis());
     private final AtomicLong transactionCounter = new AtomicLong(0L);
-    private final AtomicLong rowEventCounter    = new AtomicLong(0L);
-    private final AtomicLong rowsCounter        = new AtomicLong(0L);
 
     @Override
     public List<Event> before(List<Event> events) {
@@ -34,8 +32,6 @@ public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDow
                     case ROWDATA: {
                         long exec = e.getExecuteTime();
                         if (exec > 0) localExecTime = exec;
-                        rowEventCounter.incrementAndGet();
-                        rowsCounter.addAndGet(e.getRowsCount());
                         break;
                     }
                     case TRANSACTIONEND: {
@@ -80,11 +76,4 @@ public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDow
         return transactionCounter;
     }
 
-    public AtomicLong getRowsCounter() {
-        return rowsCounter;
-    }
-
-    public AtomicLong getRowEventCounter() {
-        return rowEventCounter;
-    }
 }

+ 4 - 1
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfiler.java

@@ -71,7 +71,6 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
         String destination = result.getDestination();
         PacketType type = result.getType();
         outboundCounter.labels(destination).inc(result.getAmount());
-        packetsCounter.labels(destination, type.name()).inc();
         short errorCode = result.getErrorCode();
         if (errorCode > 0) {
             errorsCounter.labels(destination, Short.toString(errorCode)).inc();
@@ -81,12 +80,16 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
         switch (type) {
             case GET:
                 boolean empty = result.getEmpty();
+                // 区分一下空包
                 if (empty) {
                     emptyBatchesCounter.labels(destination).inc();
+                } else {
+                    packetsCounter.labels(destination, type.name()).inc();
                 }
                 break;
             // reserve for others
             default:
+                packetsCounter.labels(destination, type.name()).inc();
                 break;
         }
     }

+ 59 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/StoreCollector.java

@@ -34,11 +34,23 @@ public class StoreCollector extends Collector implements InstanceRegistry {
     private static final String                             STORE            = "canal_instance_store";
     private static final String                             PRODUCE_MEM      = "canal_instance_store_produce_mem";
     private static final String                             CONSUME_MEM      = "canal_instance_store_consume_mem";
+    private static final String                             PUT_DELAY        = "canal_instance_put_delay";
+    private static final String                             GET_DELAY        = "canal_instance_get_delay";
+    private static final String                             ACK_DELAY        = "canal_instance_ack_delay";
+    private static final String                             PUT_ROWS         = "canal_instance_put_rows";
+    private static final String                             GET_ROWS         = "canal_instance_get_rows";
+    private static final String                             ACK_ROWS         = "canal_instance_ack_rows";
     private static final String                             PRODUCE_HELP     = "Produced events counter of canal instance";
     private static final String                             CONSUME_HELP     = "Consumed events counter of canal instance";
     private static final String                             STORE_HELP       = "Canal instance info";
     private static final String                             PRODUCE_MEM_HELP = "Produced mem bytes of canal instance";
     private static final String                             CONSUME_MEM_HELP = "Consumed mem bytes of canal instance";
+    private static final String                             PUT_DELAY_HELP   = "Traffic delay of canal instance put";
+    private static final String                             GET_DELAY_HELP   = "Traffic delay of canal instance get";
+    private static final String                             ACK_DELAY_HELP   = "Traffic delay of canal instance ack";
+    private static final String                             PUT_ROWS_HELP    = "Put table rows of canal instance";
+    private static final String                             GET_ROWS_HELP    = "Got table rows of canal instance";
+    private static final String                             ACK_ROWS_HELP    = "Acked table rows of canal instance";
     private final ConcurrentMap<String, StoreMetricsHolder> instances        = new ConcurrentHashMap<String, StoreMetricsHolder>();
     private final List<String>                              storeLabelsList  = Arrays.asList(DEST, "batchMode", "size");
 
@@ -65,11 +77,40 @@ public class StoreCollector extends Collector implements InstanceRegistry {
                 PRODUCE_MEM_HELP, DEST_LABELS_LIST);
         CounterMetricFamily ackMem = new CounterMetricFamily(CONSUME_MEM,
                 CONSUME_MEM_HELP, DEST_LABELS_LIST);
+        GaugeMetricFamily putDelay = new GaugeMetricFamily(PUT_DELAY,
+                PUT_DELAY_HELP, DEST_LABELS_LIST);
+        GaugeMetricFamily getDelay = new GaugeMetricFamily(GET_DELAY,
+                GET_DELAY_HELP, DEST_LABELS_LIST);
+        GaugeMetricFamily ackDelay = new GaugeMetricFamily(ACK_DELAY,
+                ACK_DELAY_HELP, DEST_LABELS_LIST);
+        CounterMetricFamily putRows = new CounterMetricFamily(PUT_ROWS,
+                PUT_ROWS_HELP, DEST_LABELS_LIST);
+        CounterMetricFamily getRows = new CounterMetricFamily(GET_ROWS,
+                GET_ROWS_HELP, DEST_LABELS_LIST);
+        CounterMetricFamily ackRows = new CounterMetricFamily(ACK_ROWS,
+                ACK_ROWS_HELP, DEST_LABELS_LIST);
         boolean hasMem = false;
         for (StoreMetricsHolder smh : instances.values()) {
             final boolean isMem = smh.batchMode.isMemSize();
             put.addMetric(smh.destLabelValues, smh.putSeq.doubleValue());
             ack.addMetric(smh.destLabelValues, smh.ackSeq.doubleValue());
+            long pet = smh.putExecTime.get();
+            // 防止出现启动时,未消费造成的get, ack延时小于前阶段的情况
+            long get = Math.min(smh.getExecTime.get(), pet);
+            long aet = Math.min(smh.ackExecTime.get(), get);
+            long now = System.currentTimeMillis();
+            if (now >= pet) {
+                putDelay.addMetric(smh.destLabelValues, (now - pet));
+            }
+            if (now >= get) {
+                getDelay.addMetric(smh.destLabelValues, (now - get));
+            }
+            if (now >= aet) {
+                ackDelay.addMetric(smh.destLabelValues, (now - aet));
+            }
+            putRows.addMetric(smh.destLabelValues, smh.putTableRows.doubleValue());
+            getRows.addMetric(smh.destLabelValues, smh.getTableRows.doubleValue());
+            ackRows.addMetric(smh.destLabelValues, smh.ackTableRows.doubleValue());
             store.addMetric(smh.storeLabelValues, 1);
             if (isMem) {
                 hasMem = true;
@@ -80,6 +121,12 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         mfs.add(put);
         mfs.add(ack);
         mfs.add(store);
+        mfs.add(putDelay);
+        mfs.add(getDelay);
+        mfs.add(ackDelay);
+        mfs.add(putRows);
+        mfs.add(getRows);
+        mfs.add(ackRows);
         if (hasMem) {
             mfs.add(putMem);
             mfs.add(ackMem);
@@ -102,6 +149,12 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         holder.destLabelValues = Collections.singletonList(destination);
         holder.size = memStore.getBufferSize();
         holder.storeLabelValues = Arrays.asList(destination, holder.batchMode.name(), Integer.toString(holder.size));
+        holder.putExecTime = memStore.getPutExecTime();
+        holder.getExecTime = memStore.getGetExecTime();
+        holder.ackExecTime = memStore.getAckExecTime();
+        holder.putTableRows = memStore.getPutTableRows();
+        holder.getTableRows = memStore.getGetTableRows();
+        holder.ackTableRows = memStore.getAckTableRows();
         Preconditions.checkNotNull(holder.batchMode);
         Preconditions.checkNotNull(holder.putSeq);
         Preconditions.checkNotNull(holder.ackSeq);
@@ -129,6 +182,12 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         private BatchMode    batchMode;
         private AtomicLong   putMemSize;
         private AtomicLong   ackMemSize;
+        private AtomicLong   putExecTime;
+        private AtomicLong   getExecTime;
+        private AtomicLong   ackExecTime;
+        private AtomicLong   putTableRows;
+        private AtomicLong   getTableRows;
+        private AtomicLong   ackTableRows;
         private int          size;
         private List<String> destLabelValues;
         private List<String> storeLabelValues;

+ 2 - 0
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -220,6 +220,7 @@ public class SessionHandler extends SimpleChannelHandler {
                         } else {
                             clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()));
                             embeddedServer.ack(clientIdentity, ack.getBatchId());
+                            new ChannelFutureAggregator(ack.getDestination(), ack, packet.getType(), 0, System.nanoTime() - start).operationComplete(null);
                         }
                     } else {
                         byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", ack.toString()).getMessage());
@@ -239,6 +240,7 @@ public class SessionHandler extends SimpleChannelHandler {
                         } else {
                             embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
                         }
+                        new ChannelFutureAggregator(rollback.getDestination(), rollback, packet.getType(), 0, System.nanoTime() - start).operationComplete(null);
                     } else {
                         byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", rollback.toString()).getMessage());
                         NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(rollback.getDestination(),

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/listener/ChannelFutureAggregator.java

@@ -43,7 +43,7 @@ public class ChannelFutureAggregator implements ChannelFutureListener {
     @Override
     public void operationComplete(ChannelFuture future) {
         // profiling after I/O operation
-        if (future.getCause() != null) {
+        if (future != null && future.getCause() != null) {
             result.channelError = future.getCause();
         }
         profiler().profiling(result);

+ 86 - 2
store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

@@ -51,6 +51,16 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     private AtomicLong        getMemSize    = new AtomicLong(0);
     private AtomicLong        ackMemSize    = new AtomicLong(0);
 
+    // 记录下put/get/ack操作的三个execTime
+    private AtomicLong        putExecTime   = new AtomicLong(System.currentTimeMillis());
+    private AtomicLong        getExecTime   = new AtomicLong(System.currentTimeMillis());
+    private AtomicLong        ackExecTime   = new AtomicLong(System.currentTimeMillis());
+
+    // 记录下put/get/ack操作的三个table rows
+    private AtomicLong        putTableRows  = new AtomicLong(0);
+    private AtomicLong        getTableRows  = new AtomicLong(0);
+    private AtomicLong        ackTableRows  = new AtomicLong(0);
+
     // 阻塞put/get操作控制信号
     private ReentrantLock     lock          = new ReentrantLock();
     private Condition         notFull       = lock.newCondition();
@@ -192,7 +202,7 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
             putMemSize.getAndAdd(size);
         }
-
+        profiling(data, OP.PUT);
         // tell other threads that store is not empty
         notEmpty.signal();
     }
@@ -336,6 +346,7 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
         if (getSequence.compareAndSet(current, end)) {
             getMemSize.addAndGet(memsize);
             notFull.signal();
+            profiling(result.getEvents(), OP.GET);
             return result;
         } else {
             return new Events<Event>();
@@ -407,8 +418,15 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
             boolean hasMatch = false;
             long memsize = 0;
+            // ack没有list,但有已存在的foreach,还是节省一下list的开销
+            long localExecTime = 0L;
+            int deltaRows = 0;
             for (long next = sequence + 1; next <= maxSequence; next++) {
                 Event event = entries[getIndex(next)];
+                if (localExecTime == 0 && event.getExecuteTime() > 0) {
+                    localExecTime = event.getExecuteTime();
+                }
+                deltaRows += event.getRowsCount();
                 memsize += calculateSize(event);
                 boolean match = CanalEventUtils.checkPosition(event, (LogPosition) position);
                 if (match) {// 找到对应的position,更新ack seq
@@ -424,11 +442,14 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
                     if (ackSequence.compareAndSet(sequence, next)) {// 避免并发ack
                         notFull.signal();
+                        ackTableRows.addAndGet(deltaRows);
+                        if (localExecTime > 0) {
+                            ackExecTime.lazySet(localExecTime);
+                        }
                         return;
                     }
                 }
             }
-
             if (!hasMatch) {// 找不到对应需要ack的position
                 throw new CanalStoreException("no match ack position" + position.toString());
             }
@@ -544,6 +565,45 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
                || type == EventType.DINDEX;
     }
 
+    private void profiling(List<Event> events, OP op) {
+        long localExecTime = 0L;
+        int deltaRows = 0;
+        if (events != null && !events.isEmpty()) {
+            for (Event e : events) {
+                if (localExecTime == 0 && e.getExecuteTime() > 0) {
+                    localExecTime = e.getExecuteTime();
+                }
+                deltaRows += e.getRowsCount();
+            }
+        }
+        switch (op) {
+            case PUT:
+                putTableRows.addAndGet(deltaRows);
+                if (localExecTime > 0) {
+                    putExecTime.lazySet(localExecTime);
+                }
+                break;
+            case GET:
+                getTableRows.addAndGet(deltaRows);
+                if (localExecTime > 0) {
+                    getExecTime.lazySet(localExecTime);
+                }
+                break;
+            case ACK:
+                ackTableRows.addAndGet(deltaRows);
+                if (localExecTime > 0) {
+                    ackExecTime.lazySet(localExecTime);
+                }
+                break;
+            default:
+                break;
+        }
+    }
+
+    private enum OP {
+        PUT, GET, ACK
+    }
+
     // ================ setter / getter ==================
     public int getBufferSize() {
         return this.bufferSize;
@@ -584,4 +644,28 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     public BatchMode getBatchMode() {
         return batchMode;
     }
+
+    public AtomicLong getPutExecTime() {
+        return putExecTime;
+    }
+
+    public AtomicLong getGetExecTime() {
+        return getExecTime;
+    }
+
+    public AtomicLong getAckExecTime() {
+        return ackExecTime;
+    }
+
+    public AtomicLong getPutTableRows() {
+        return putTableRows;
+    }
+
+    public AtomicLong getGetTableRows() {
+        return getTableRows;
+    }
+
+    public AtomicLong getAckTableRows() {
+        return ackTableRows;
+    }
 }