Ver Fonte

Improve metrics.

Chuanyi Li há 6 anos atrás
pai
commit
2784f33b52

+ 183 - 37
deployer/src/main/resources/metrics/Canal_instances_tmpl.json

@@ -1,5 +1,14 @@
 {
-  "__inputs": [],
+  "__inputs": [
+    {
+      "name": "DS_PROMETHEUS",
+      "label": "prometheus",
+      "description": "",
+      "type": "datasource",
+      "pluginId": "prometheus",
+      "pluginName": "Prometheus"
+    }
+  ],
   "__requires": [
     {
       "type": "grafana",
@@ -12,6 +21,12 @@
       "id": "graph",
       "name": "Graph",
       "version": "5.0.0"
+    },
+    {
+      "type": "datasource",
+      "id": "prometheus",
+      "name": "Prometheus",
+      "version": "5.0.0"
     }
   ],
   "annotations": {
@@ -31,7 +46,7 @@
   "gnetId": null,
   "graphTooltip": 0,
   "id": null,
-  "iteration": 1534552626378,
+  "iteration": 1534648729090,
   "links": [],
   "panels": [
     {
@@ -261,12 +276,16 @@
       }
     },
     {
-      "aliasColors": {},
+      "aliasColors": {
+        "ack": "#f29191",
+        "get": "#cca300",
+        "put": "#1f78c1"
+      },
       "bars": false,
       "dashLength": 10,
       "dashes": false,
       "datasource": "$datasource",
-      "description": "Canal instance 相对于master的延时。",
+      "description": "master: Canal server相对于MySQL master的延时。通过master heartbeat机制能刷新idle状态下的延时。\nput: store put操作的时间点为基准。\nget: client get操作的时间点为基准。\nack:  client ack操作的时间点为基准。",
       "fill": 1,
       "gridPos": {
         "h": 5,
@@ -300,10 +319,35 @@
         {
           "expr": "canal_instance_traffic_delay{destination=~\"$destination\"} / 1000",
           "format": "time_series",
+          "hide": false,
           "interval": "15s",
           "intervalFactor": 2,
-          "legendFormat": "delay",
+          "legendFormat": "master",
+          "refId": "D"
+        },
+        {
+          "expr": "canal_instance_put_delay{destination=~\"$destination\"} / 1000",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "put",
           "refId": "A"
+        },
+        {
+          "expr": "canal_instance_get_delay{destination=~\"$destination\"} / 1000",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "get",
+          "refId": "B"
+        },
+        {
+          "expr": "canal_instance_ack_delay{destination=~\"$destination\"} / 1000",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "ack",
+          "refId": "C"
         }
       ],
       "thresholds": [],
@@ -352,7 +396,7 @@
       "dashLength": 10,
       "dashes": false,
       "datasource": "$datasource",
-      "description": "sink线程idle占比;dump线程idle占比(仅parallel mode)。",
+      "description": "sink线程blocking占比;dump线程blocking占比(仅parallel mode)。",
       "fill": 1,
       "gridPos": {
         "h": 5,
@@ -404,7 +448,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Idle",
+      "title": "Blocking",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -462,7 +506,7 @@
       "dashLength": 10,
       "dashes": false,
       "datasource": "$datasource",
-      "description": "Canal client instance消费binlog的TPS, 以MySQL binlog events为单位计算。",
+      "description": "Canal client instance消费binlog的TPS(ack过的events), 以binlog events为单位计算。",
       "fill": 1,
       "gridPos": {
         "h": 5,
@@ -505,7 +549,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "TPS",
+      "title": "TPS(events)",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -639,7 +683,7 @@
       "dashLength": 10,
       "dashes": false,
       "datasource": "$datasource",
-      "description": "Canal instance处理binlog的TPS。\n1. table rows:以table变更行数为单位。\n2. row data:rowData events为单位。",
+      "description": "Instance处理binlog的TPS(以master变更行数table rows为基准计算)。\nput: put操作TPS。\nget: get操作TPS。\nack: ack操作TPS。",
       "fill": 1,
       "gridPos": {
         "h": 5,
@@ -671,26 +715,33 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "rate(canal_instance_rows_counter{destination=~\"$destination\"}[30s])",
+          "expr": "rate(canal_instance_put_rows{destination=~\"$destination\"}[2m])",
           "format": "time_series",
           "interval": "15s",
           "intervalFactor": 2,
-          "legendFormat": "tableRows",
+          "legendFormat": "put",
           "refId": "A"
         },
         {
-          "expr": "rate(canal_instance_row_events{destination=~\"$destination\"}[30s])",
+          "expr": "rate(canal_instance_get_rows{destination=~\"$destination\"}[2m])",
           "format": "time_series",
           "interval": "15s",
           "intervalFactor": 2,
-          "legendFormat": "rowDatas",
+          "legendFormat": "get",
           "refId": "B"
+        },
+        {
+          "expr": "rate(canal_instance_ack_rows{destination=~\"$destination\"}[2m])",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "legendFormat": "ack",
+          "refId": "C"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "TPS(table rows VS row data)",
+      "title": "TPS(table rows)",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -831,8 +882,8 @@
       "bars": false,
       "dashLength": 10,
       "dashes": false,
-      "datasource": "$datasource",
-      "description": "Canal client 请求latency概况。",
+      "datasource": "${DS_PROMETHEUS}",
+      "description": "client 请求的GET与ACK包的QPS。",
       "fill": 1,
       "gridPos": {
         "h": 5,
@@ -840,14 +891,12 @@
         "x": 6,
         "y": 13
       },
-      "id": 18,
+      "id": 38,
       "legend": {
-        "alignAsTable": false,
         "avg": false,
         "current": false,
         "max": false,
         "min": false,
-        "rightSide": false,
         "show": true,
         "total": false,
         "values": false
@@ -860,39 +909,37 @@
       "pointradius": 5,
       "points": false,
       "renderer": "flot",
-      "seriesOverrides": [
-        {
-          "alias": "25.0",
-          "yaxis": 1
-        },
-        {
-          "alias": "100.0",
-          "yaxis": 1
-        }
-      ],
+      "seriesOverrides": [],
       "spaceLength": 10,
       "stack": false,
       "steppedLine": false,
       "targets": [
         {
-          "expr": "rate(canal_instance_client_request_latency_bucket{destination=~\"$destination\"}[2m])",
+          "expr": "rate(canal_instance_client_packets{destination=~\"$destination\",packetType=\"GET\"}[2m])",
           "format": "time_series",
           "interval": "15s",
           "intervalFactor": 2,
-          "legendFormat": "{{le}}ms",
+          "legendFormat": "GET",
           "refId": "A"
+        },
+        {
+          "expr": "rate(canal_instance_client_packets{destination=~\"$destination\",packetType=\"CLIENTACK\"}[2m])",
+          "format": "time_series",
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "ACK",
+          "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Latency",
+      "title": "Client QPS",
       "tooltip": {
         "shared": true,
         "sort": 0,
         "value_type": "individual"
       },
-      "transparent": false,
       "type": "graph",
       "xaxis": {
         "buckets": null,
@@ -973,7 +1020,7 @@
           "expr": "rate(canal_instance_client_packets{destination=~\"$destination\", packetType=\"GET\"}[2m])",
           "format": "time_series",
           "intervalFactor": 1,
-          "legendFormat": "total",
+          "legendFormat": "nonempty",
           "refId": "B"
         }
       ],
@@ -1017,6 +1064,105 @@
         "alignLevel": null
       }
     },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "$datasource",
+      "description": "Canal client 请求响应时间的概况。",
+      "fill": 1,
+      "gridPos": {
+        "h": 5,
+        "w": 6,
+        "x": 18,
+        "y": 13
+      },
+      "id": 18,
+      "legend": {
+        "alignAsTable": false,
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "rightSide": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [
+        {
+          "alias": "25.0",
+          "yaxis": 1
+        },
+        {
+          "alias": "100.0",
+          "yaxis": 1
+        }
+      ],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "rate(canal_instance_client_request_latency_bucket{destination=~\"$destination\"}[2m])",
+          "format": "time_series",
+          "hide": false,
+          "interval": "15s",
+          "intervalFactor": 2,
+          "legendFormat": "{{le}}ms",
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Response time",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "transparent": false,
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
     {
       "collapsed": false,
       "gridPos": {
@@ -1250,7 +1396,7 @@
     ]
   },
   "time": {
-    "from": "now-1h",
+    "from": "now-15m",
     "to": "now"
   },
   "timepicker": {
@@ -1281,5 +1427,5 @@
   "timezone": "",
   "title": "Canal instances",
   "uid": "8vh8NGpiz",
-  "version": 82
+  "version": 96
 }

BIN
images/QPS.PNG


BIN
images/TPS.PNG


BIN
images/delay.PNG


BIN
images/empty.PNG


BIN
images/overview.PNG


BIN
images/reqs.PNG


BIN
images/rows.PNG


+ 0 - 19
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/EntryCollector.java

@@ -7,7 +7,6 @@ import com.alibaba.otter.canal.sink.CanalEventSink;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;
 import com.google.common.base.Preconditions;
 import io.prometheus.client.Collector;
-import io.prometheus.client.Counter;
 import io.prometheus.client.CounterMetricFamily;
 import io.prometheus.client.GaugeMetricFamily;
 import org.slf4j.Logger;
@@ -30,12 +29,8 @@ public class EntryCollector extends Collector implements InstanceRegistry {
     private static final Logger                             logger            = LoggerFactory.getLogger(SinkCollector.class);
     private static final String                             DELAY             = "canal_instance_traffic_delay";
     private static final String                             TRANSACTION       = "canal_instance_transactions";
-    private static final String                             ROW_EVENTS        = "canal_instance_row_events";
-    private static final String                             ROWS_COUNTER      = "canal_instance_rows_counter";
     private static final String                             DELAY_HELP        = "Traffic delay of canal instance in milliseconds";
     private static final String                             TRANSACTION_HELP  = "Transactions counter of canal instance";
-    private static final String                             ROW_EVENTS_HELP   = "Rowdata events counter of canal instance";
-    private static final String                             ROWS_COUNTER_HELP = "Rows counter of canal instance";
     private final ConcurrentMap<String, EntryMetricsHolder> instances        = new ConcurrentHashMap<String, EntryMetricsHolder>();
 
     private EntryCollector() {}
@@ -55,10 +50,6 @@ public class EntryCollector extends Collector implements InstanceRegistry {
                 DELAY_HELP, DEST_LABELS_LIST);
         CounterMetricFamily transactions = new CounterMetricFamily(TRANSACTION,
                 TRANSACTION_HELP, DEST_LABELS_LIST);
-        CounterMetricFamily rowEvents = new CounterMetricFamily(ROW_EVENTS,
-                ROW_EVENTS_HELP, DEST_LABELS_LIST);
-        CounterMetricFamily rowsCounter = new CounterMetricFamily(ROWS_COUNTER,
-                ROWS_COUNTER_HELP, DEST_LABELS_LIST);
         for (EntryMetricsHolder emh : instances.values()) {
             long now = System.currentTimeMillis();
             long latest = emh.latestExecTime.get();
@@ -66,13 +57,9 @@ public class EntryCollector extends Collector implements InstanceRegistry {
                 delay.addMetric(emh.destLabelValues, (now - latest));
             }
             transactions.addMetric(emh.destLabelValues, emh.transactionCounter.doubleValue());
-            rowEvents.addMetric(emh.destLabelValues, emh.rowEventCounter.doubleValue());
-            rowsCounter.addMetric(emh.destLabelValues, emh.rowsCounter.doubleValue());
         }
         mfs.add(delay);
         mfs.add(transactions);
-        mfs.add(rowEvents);
-        mfs.add(rowsCounter);
         return mfs;
     }
 
@@ -89,12 +76,8 @@ public class EntryCollector extends Collector implements InstanceRegistry {
         PrometheusCanalEventDownStreamHandler handler = assembleHandler(entrySink);
         holder.latestExecTime = handler.getLatestExecuteTime();
         holder.transactionCounter = handler.getTransactionCounter();
-        holder.rowEventCounter = handler.getRowEventCounter();
-        holder.rowsCounter = handler.getRowsCounter();
         Preconditions.checkNotNull(holder.latestExecTime);
         Preconditions.checkNotNull(holder.transactionCounter);
-        Preconditions.checkNotNull(holder.rowEventCounter);
-        Preconditions.checkNotNull(holder.rowsCounter);
         EntryMetricsHolder old = instances.put(destination, holder);
         if (old != null) {
             logger.warn("Remove stale EntryCollector for instance {}.", destination);
@@ -145,8 +128,6 @@ public class EntryCollector extends Collector implements InstanceRegistry {
     private class EntryMetricsHolder {
         private AtomicLong   latestExecTime;
         private AtomicLong   transactionCounter;
-        private AtomicLong   rowEventCounter;
-        private AtomicLong   rowsCounter;
         private List<String> destLabelValues;
     }
 

+ 0 - 11
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java

@@ -15,8 +15,6 @@ public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDow
 
     private final AtomicLong latestExecuteTime  = new AtomicLong(System.currentTimeMillis());
     private final AtomicLong transactionCounter = new AtomicLong(0L);
-    private final AtomicLong rowEventCounter    = new AtomicLong(0L);
-    private final AtomicLong rowsCounter        = new AtomicLong(0L);
 
     @Override
     public List<Event> before(List<Event> events) {
@@ -34,8 +32,6 @@ public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDow
                     case ROWDATA: {
                         long exec = e.getExecuteTime();
                         if (exec > 0) localExecTime = exec;
-                        rowEventCounter.incrementAndGet();
-                        rowsCounter.addAndGet(e.getRowsCount());
                         break;
                     }
                     case TRANSACTIONEND: {
@@ -80,11 +76,4 @@ public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDow
         return transactionCounter;
     }
 
-    public AtomicLong getRowsCounter() {
-        return rowsCounter;
-    }
-
-    public AtomicLong getRowEventCounter() {
-        return rowEventCounter;
-    }
 }

+ 4 - 1
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfiler.java

@@ -71,7 +71,6 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
         String destination = result.getDestination();
         PacketType type = result.getType();
         outboundCounter.labels(destination).inc(result.getAmount());
-        packetsCounter.labels(destination, type.name()).inc();
         short errorCode = result.getErrorCode();
         if (errorCode > 0) {
             errorsCounter.labels(destination, Short.toString(errorCode)).inc();
@@ -81,12 +80,16 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
         switch (type) {
             case GET:
                 boolean empty = result.getEmpty();
+                // 区分一下空包
                 if (empty) {
                     emptyBatchesCounter.labels(destination).inc();
+                } else {
+                    packetsCounter.labels(destination, type.name()).inc();
                 }
                 break;
             // reserve for others
             default:
+                packetsCounter.labels(destination, type.name()).inc();
                 break;
         }
     }

+ 59 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/StoreCollector.java

@@ -34,11 +34,23 @@ public class StoreCollector extends Collector implements InstanceRegistry {
     private static final String                             STORE            = "canal_instance_store";
     private static final String                             PRODUCE_MEM      = "canal_instance_store_produce_mem";
     private static final String                             CONSUME_MEM      = "canal_instance_store_consume_mem";
+    private static final String                             PUT_DELAY        = "canal_instance_put_delay";
+    private static final String                             GET_DELAY        = "canal_instance_get_delay";
+    private static final String                             ACK_DELAY        = "canal_instance_ack_delay";
+    private static final String                             PUT_ROWS         = "canal_instance_put_rows";
+    private static final String                             GET_ROWS         = "canal_instance_get_rows";
+    private static final String                             ACK_ROWS         = "canal_instance_ack_rows";
     private static final String                             PRODUCE_HELP     = "Produced events counter of canal instance";
     private static final String                             CONSUME_HELP     = "Consumed events counter of canal instance";
     private static final String                             STORE_HELP       = "Canal instance info";
     private static final String                             PRODUCE_MEM_HELP = "Produced mem bytes of canal instance";
     private static final String                             CONSUME_MEM_HELP = "Consumed mem bytes of canal instance";
+    private static final String                             PUT_DELAY_HELP   = "Traffic delay of canal instance put";
+    private static final String                             GET_DELAY_HELP   = "Traffic delay of canal instance get";
+    private static final String                             ACK_DELAY_HELP   = "Traffic delay of canal instance ack";
+    private static final String                             PUT_ROWS_HELP    = "Put table rows of canal instance";
+    private static final String                             GET_ROWS_HELP    = "Got table rows of canal instance";
+    private static final String                             ACK_ROWS_HELP    = "Acked table rows of canal instance";
     private final ConcurrentMap<String, StoreMetricsHolder> instances        = new ConcurrentHashMap<String, StoreMetricsHolder>();
     private final List<String>                              storeLabelsList  = Arrays.asList(DEST, "batchMode", "size");
 
@@ -65,11 +77,40 @@ public class StoreCollector extends Collector implements InstanceRegistry {
                 PRODUCE_MEM_HELP, DEST_LABELS_LIST);
         CounterMetricFamily ackMem = new CounterMetricFamily(CONSUME_MEM,
                 CONSUME_MEM_HELP, DEST_LABELS_LIST);
+        GaugeMetricFamily putDelay = new GaugeMetricFamily(PUT_DELAY,
+                PUT_DELAY_HELP, DEST_LABELS_LIST);
+        GaugeMetricFamily getDelay = new GaugeMetricFamily(GET_DELAY,
+                GET_DELAY_HELP, DEST_LABELS_LIST);
+        GaugeMetricFamily ackDelay = new GaugeMetricFamily(ACK_DELAY,
+                ACK_DELAY_HELP, DEST_LABELS_LIST);
+        CounterMetricFamily putRows = new CounterMetricFamily(PUT_ROWS,
+                PUT_ROWS_HELP, DEST_LABELS_LIST);
+        CounterMetricFamily getRows = new CounterMetricFamily(GET_ROWS,
+                GET_ROWS_HELP, DEST_LABELS_LIST);
+        CounterMetricFamily ackRows = new CounterMetricFamily(ACK_ROWS,
+                ACK_ROWS_HELP, DEST_LABELS_LIST);
         boolean hasMem = false;
         for (StoreMetricsHolder smh : instances.values()) {
             final boolean isMem = smh.batchMode.isMemSize();
             put.addMetric(smh.destLabelValues, smh.putSeq.doubleValue());
             ack.addMetric(smh.destLabelValues, smh.ackSeq.doubleValue());
+            long pet = smh.putExecTime.get();
+            // 防止出现启动时,未消费造成的get, ack延时小于前阶段的情况
+            long get = Math.min(smh.getExecTime.get(), pet);
+            long aet = Math.min(smh.ackExecTime.get(), get);
+            long now = System.currentTimeMillis();
+            if (now >= pet) {
+                putDelay.addMetric(smh.destLabelValues, (now - pet));
+            }
+            if (now >= get) {
+                getDelay.addMetric(smh.destLabelValues, (now - get));
+            }
+            if (now >= aet) {
+                ackDelay.addMetric(smh.destLabelValues, (now - aet));
+            }
+            putRows.addMetric(smh.destLabelValues, smh.putTableRows.doubleValue());
+            getRows.addMetric(smh.destLabelValues, smh.getTableRows.doubleValue());
+            ackRows.addMetric(smh.destLabelValues, smh.ackTableRows.doubleValue());
             store.addMetric(smh.storeLabelValues, 1);
             if (isMem) {
                 hasMem = true;
@@ -80,6 +121,12 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         mfs.add(put);
         mfs.add(ack);
         mfs.add(store);
+        mfs.add(putDelay);
+        mfs.add(getDelay);
+        mfs.add(ackDelay);
+        mfs.add(putRows);
+        mfs.add(getRows);
+        mfs.add(ackRows);
         if (hasMem) {
             mfs.add(putMem);
             mfs.add(ackMem);
@@ -102,6 +149,12 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         holder.destLabelValues = Collections.singletonList(destination);
         holder.size = memStore.getBufferSize();
         holder.storeLabelValues = Arrays.asList(destination, holder.batchMode.name(), Integer.toString(holder.size));
+        holder.putExecTime = memStore.getPutExecTime();
+        holder.getExecTime = memStore.getGetExecTime();
+        holder.ackExecTime = memStore.getAckExecTime();
+        holder.putTableRows = memStore.getPutTableRows();
+        holder.getTableRows = memStore.getGetTableRows();
+        holder.ackTableRows = memStore.getAckTableRows();
         Preconditions.checkNotNull(holder.batchMode);
         Preconditions.checkNotNull(holder.putSeq);
         Preconditions.checkNotNull(holder.ackSeq);
@@ -129,6 +182,12 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         private BatchMode    batchMode;
         private AtomicLong   putMemSize;
         private AtomicLong   ackMemSize;
+        private AtomicLong   putExecTime;
+        private AtomicLong   getExecTime;
+        private AtomicLong   ackExecTime;
+        private AtomicLong   putTableRows;
+        private AtomicLong   getTableRows;
+        private AtomicLong   ackTableRows;
         private int          size;
         private List<String> destLabelValues;
         private List<String> storeLabelValues;

+ 2 - 0
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -220,6 +220,7 @@ public class SessionHandler extends SimpleChannelHandler {
                         } else {
                             clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()));
                             embeddedServer.ack(clientIdentity, ack.getBatchId());
+                            new ChannelFutureAggregator(ack.getDestination(), ack, packet.getType(), 0, System.nanoTime() - start).operationComplete(null);
                         }
                     } else {
                         byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", ack.toString()).getMessage());
@@ -239,6 +240,7 @@ public class SessionHandler extends SimpleChannelHandler {
                         } else {
                             embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
                         }
+                        new ChannelFutureAggregator(rollback.getDestination(), rollback, packet.getType(), 0, System.nanoTime() - start).operationComplete(null);
                     } else {
                         byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", rollback.toString()).getMessage());
                         NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(rollback.getDestination(),

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/listener/ChannelFutureAggregator.java

@@ -43,7 +43,7 @@ public class ChannelFutureAggregator implements ChannelFutureListener {
     @Override
     public void operationComplete(ChannelFuture future) {
         // profiling after I/O operation
-        if (future.getCause() != null) {
+        if (future != null && future.getCause() != null) {
             result.channelError = future.getCause();
         }
         profiler().profiling(result);

+ 86 - 2
store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

@@ -51,6 +51,16 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     private AtomicLong        getMemSize    = new AtomicLong(0);
     private AtomicLong        ackMemSize    = new AtomicLong(0);
 
+    // 记录下put/get/ack操作的三个execTime
+    private AtomicLong        putExecTime   = new AtomicLong(System.currentTimeMillis());
+    private AtomicLong        getExecTime   = new AtomicLong(System.currentTimeMillis());
+    private AtomicLong        ackExecTime   = new AtomicLong(System.currentTimeMillis());
+
+    // 记录下put/get/ack操作的三个table rows
+    private AtomicLong        putTableRows  = new AtomicLong(0);
+    private AtomicLong        getTableRows  = new AtomicLong(0);
+    private AtomicLong        ackTableRows  = new AtomicLong(0);
+
     // 阻塞put/get操作控制信号
     private ReentrantLock     lock          = new ReentrantLock();
     private Condition         notFull       = lock.newCondition();
@@ -192,7 +202,7 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
             putMemSize.getAndAdd(size);
         }
-
+        profiling(data, OP.PUT);
         // tell other threads that store is not empty
         notEmpty.signal();
     }
@@ -336,6 +346,7 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
         if (getSequence.compareAndSet(current, end)) {
             getMemSize.addAndGet(memsize);
             notFull.signal();
+            profiling(result.getEvents(), OP.GET);
             return result;
         } else {
             return new Events<Event>();
@@ -407,8 +418,15 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
             boolean hasMatch = false;
             long memsize = 0;
+            // ack没有list,但有已存在的foreach,还是节省一下list的开销
+            long localExecTime = 0L;
+            int deltaRows = 0;
             for (long next = sequence + 1; next <= maxSequence; next++) {
                 Event event = entries[getIndex(next)];
+                if (localExecTime == 0 && event.getExecuteTime() > 0) {
+                    localExecTime = event.getExecuteTime();
+                }
+                deltaRows += event.getRowsCount();
                 memsize += calculateSize(event);
                 boolean match = CanalEventUtils.checkPosition(event, (LogPosition) position);
                 if (match) {// 找到对应的position,更新ack seq
@@ -424,11 +442,14 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
                     if (ackSequence.compareAndSet(sequence, next)) {// 避免并发ack
                         notFull.signal();
+                        ackTableRows.addAndGet(deltaRows);
+                        if (localExecTime > 0) {
+                            ackExecTime.lazySet(localExecTime);
+                        }
                         return;
                     }
                 }
             }
-
             if (!hasMatch) {// 找不到对应需要ack的position
                 throw new CanalStoreException("no match ack position" + position.toString());
             }
@@ -544,6 +565,45 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
                || type == EventType.DINDEX;
     }
 
+    private void profiling(List<Event> events, OP op) {
+        long localExecTime = 0L;
+        int deltaRows = 0;
+        if (events != null && !events.isEmpty()) {
+            for (Event e : events) {
+                if (localExecTime == 0 && e.getExecuteTime() > 0) {
+                    localExecTime = e.getExecuteTime();
+                }
+                deltaRows += e.getRowsCount();
+            }
+        }
+        switch (op) {
+            case PUT:
+                putTableRows.addAndGet(deltaRows);
+                if (localExecTime > 0) {
+                    putExecTime.lazySet(localExecTime);
+                }
+                break;
+            case GET:
+                getTableRows.addAndGet(deltaRows);
+                if (localExecTime > 0) {
+                    getExecTime.lazySet(localExecTime);
+                }
+                break;
+            case ACK:
+                ackTableRows.addAndGet(deltaRows);
+                if (localExecTime > 0) {
+                    ackExecTime.lazySet(localExecTime);
+                }
+                break;
+            default:
+                break;
+        }
+    }
+
+    private enum OP {
+        PUT, GET, ACK
+    }
+
     // ================ setter / getter ==================
     public int getBufferSize() {
         return this.bufferSize;
@@ -584,4 +644,28 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     public BatchMode getBatchMode() {
         return batchMode;
     }
+
+    public AtomicLong getPutExecTime() {
+        return putExecTime;
+    }
+
+    public AtomicLong getGetExecTime() {
+        return getExecTime;
+    }
+
+    public AtomicLong getAckExecTime() {
+        return ackExecTime;
+    }
+
+    public AtomicLong getPutTableRows() {
+        return putTableRows;
+    }
+
+    public AtomicLong getGetTableRows() {
+        return getTableRows;
+    }
+
+    public AtomicLong getAckTableRows() {
+        return ackTableRows;
+    }
 }