Pārlūkot izejas kodu

Merge pull request #754 from wingerx/master

fix issue #751
agapple 7 gadi atpakaļ
vecāks
revīzija
33a2a6121a

+ 16 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -504,9 +504,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 throw new CanalParseException("unsupport event type :" + event.getHeader().getType());
             }
 
-            TableMapLogEvent table = event.getTable();
-            Header header = createHeader(event.getHeader(), table.getDbName(), table.getTableName(), eventType);
-
             RowChange.Builder rowChangeBuider = RowChange.newBuilder();
             rowChangeBuider.setTableId(event.getTableId());
             rowChangeBuider.setIsDdl(false);
@@ -517,6 +514,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             BitSet changeColumns = event.getChangeColumns();
 
             boolean tableError = false;
+            int rowsCount = 0;
             while (buffer.nextOneRow(columns)) {
                 // 处理row记录
                 RowData.Builder rowDataBuilder = RowData.newBuilder();
@@ -537,9 +535,13 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                     tableError |= parseOneRow(rowDataBuilder, event, buffer, changeColumns, true, tableMeta);
                 }
 
+                rowsCount ++;
                 rowChangeBuider.addRowDatas(rowDataBuilder.build());
             }
 
+            TableMapLogEvent table = event.getTable();
+            Header header = createHeader(event.getHeader(), table.getDbName(), table.getTableName(), eventType, rowsCount);
+
             RowChange rowChange = rowChangeBuider.build();
             if (tableError) {
                 Entry entry = createEntry(header, EntryType.ROWDATA, ByteString.EMPTY);
@@ -793,7 +795,12 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
     }
 
+
     private Header createHeader(LogHeader logHeader, String schemaName, String tableName, EventType eventType) {
+        return createHeader(logHeader, schemaName, tableName, eventType, -1);
+    }
+
+    private Header createHeader(LogHeader logHeader, String schemaName, String tableName, EventType eventType, Integer rowsCount) {
         // header会做信息冗余,方便以后做检索或者过滤
         Header.Builder headerBuilder = Header.newBuilder();
         headerBuilder.setVersion(version);
@@ -818,6 +825,12 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             String gtid = gtidSet.toString();
             headerBuilder.setGtid(gtid);
         }
+
+        // add rowsCount suppport
+        if (rowsCount > 0) {
+            Pair pair = createSpecialPair("rowsCount", String.valueOf(rowsCount));
+            headerBuilder.addProps(pair);
+        }
         return headerBuilder.build();
     }