Prechádzať zdrojové kódy

fixed issue #626, support xa rollback/commit event

七锋 7 rokov pred
rodič
commit
b734a32ae6

+ 25 - 0
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -18,6 +18,7 @@ import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
+import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
 import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
 import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
@@ -199,6 +200,7 @@ public class AbstractCanalClientTest {
                                 String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                 entry.getHeader().getGtid(), String.valueOf(delayTime) });
                     logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
+                    printXAInfo(begin.getPropsList());
                 } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                     TransactionEnd end = null;
                     try {
@@ -209,6 +211,7 @@ public class AbstractCanalClientTest {
                     // 打印事务提交信息,事务id
                     logger.info("----------------\n");
                     logger.info(" END ----> transaction id: {}", end.getTransactionId());
+                    printXAInfo(end.getPropsList());
                     logger.info(transaction_format,
                         new Object[] { entry.getHeader().getLogfileName(),
                                 String.valueOf(entry.getHeader().getLogfileOffset()),
@@ -241,6 +244,7 @@ public class AbstractCanalClientTest {
                     continue;
                 }
 
+                printXAInfo(rowChage.getPropsList());
                 for (RowData rowData : rowChage.getRowDatasList()) {
                     if (eventType == EventType.DELETE) {
                         printColumn(rowData.getBeforeColumnsList());
@@ -267,6 +271,27 @@ public class AbstractCanalClientTest {
         }
     }
 
+    protected void printXAInfo(List<Pair> pairs) {
+        if (pairs == null) {
+            return;
+        }
+
+        String xaType = null;
+        String xaXid = null;
+        for (Pair pair : pairs) {
+            String key = pair.getKey();
+            if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
+                xaType = pair.getValue();
+            } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
+                xaXid = pair.getValue();
+            }
+        }
+
+        if (xaType != null && xaXid != null) {
+            logger.info(" ------> " + xaType + " " + xaXid);
+        }
+    }
+
     public void setConnector(CanalConnector connector) {
         this.connector = connector;
     }

+ 47 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -67,6 +67,12 @@ import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
  */
 public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogParser<LogEvent> {
 
+    public static final String          XA_XID              = "XA_XID";
+    public static final String          XA_TYPE             = "XA_TYPE";
+    public static final String          XA_START            = "XA START";
+    public static final String          XA_END              = "XA END";
+    public static final String          XA_COMMIT           = "XA COMMIT";
+    public static final String          XA_ROLLBACK         = "XA ROLLBACK";
     public static final String          ISO_8859_1          = "ISO-8859-1";
     public static final String          UTF_8               = "UTF-8";
     public static final int             TINYINT_MAX_VALUE   = 256;
@@ -174,7 +180,43 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     private Entry parseQueryEvent(QueryLogEvent event, boolean isSeek) {
         String queryString = event.getQuery();
-        if (StringUtils.endsWithIgnoreCase(queryString, BEGIN)) {
+        if (StringUtils.startsWithIgnoreCase(queryString, XA_START)) {
+            // xa start use TransactionBegin
+            TransactionBegin.Builder beginBuilder = TransactionBegin.newBuilder();
+            beginBuilder.setThreadId(event.getSessionId());
+            beginBuilder.addProps(createSpecialPair(XA_TYPE, XA_START));
+            beginBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_START)));
+            TransactionBegin transactionBegin = beginBuilder.build();
+            Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
+            return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
+        } else if (StringUtils.startsWithIgnoreCase(queryString, XA_END)) {
+            // xa start use TransactionEnd
+            TransactionEnd.Builder endBuilder = TransactionEnd.newBuilder();
+            endBuilder.setTransactionId(String.valueOf(0L));
+            endBuilder.addProps(createSpecialPair(XA_TYPE, XA_END));
+            endBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_END)));
+            TransactionEnd transactionEnd = endBuilder.build();
+            Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
+            return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
+        } else if (StringUtils.startsWithIgnoreCase(queryString, XA_COMMIT)) {
+            // xa commit
+            Header header = createHeader(binlogFileName, event.getHeader(), "", "", EventType.XACOMMIT);
+            RowChange.Builder rowChangeBuider = RowChange.newBuilder();
+            rowChangeBuider.setSql(queryString);
+            rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_COMMIT));
+            rowChangeBuider.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_COMMIT)));
+            rowChangeBuider.setEventType(EventType.XACOMMIT);
+            return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
+        } else if (StringUtils.startsWithIgnoreCase(queryString, XA_ROLLBACK)) {
+            // xa rollback
+            Header header = createHeader(binlogFileName, event.getHeader(), "", "", EventType.XAROLLBACK);
+            RowChange.Builder rowChangeBuider = RowChange.newBuilder();
+            rowChangeBuider.setSql(queryString);
+            rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_ROLLBACK));
+            rowChangeBuider.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_ROLLBACK)));
+            rowChangeBuider.setEventType(EventType.XAROLLBACK);
+            return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
+        } else if (StringUtils.endsWithIgnoreCase(queryString, BEGIN)) {
             TransactionBegin transactionBegin = createTransactionBegin(event.getSessionId());
             Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
             return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
@@ -237,6 +279,10 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         }
     }
 
+    private String getXaXid(String queryString, String type) {
+        return StringUtils.substringAfter(queryString, type);
+    }
+
     private boolean processFilter(String queryString, DdlResult result) {
         String schemaName = result.getSchemaName();
         String tableName = result.getTableName();

+ 27 - 3
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java

@@ -4,6 +4,7 @@ import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.List;
 
+import org.apache.commons.lang.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -16,6 +17,7 @@ import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
+import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
 import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
@@ -27,8 +29,8 @@ public class MysqlDumpTest {
     @Test
     public void testSimple() {
         final MysqlEventParser controller = new MysqlEventParser();
-        final EntryPosition startPosition = new EntryPosition("mysql-bin.000010", 154L, 100L);
-        startPosition.setGtid("f1ceb61a-a5d5-11e7-bdee-107c3dbcf8a7:1-17");
+        final EntryPosition startPosition = new EntryPosition("mysql-bin.000012", 34051L, 100L);
+        // startPosition.setGtid("f1ceb61a-a5d5-11e7-bdee-107c3dbcf8a7:1-17");
         controller.setConnectionCharset(Charset.forName("UTF-8"));
         controller.setSlaveId(3344L);
         controller.setDetectingEnable(false);
@@ -39,7 +41,7 @@ public class MysqlDumpTest {
         controller.setTsdbSpringXml("classpath:tsdb/h2-tsdb.xml");
         controller.setEventFilter(new AviaterRegexFilter("test\\..*"));
         controller.setEventBlackFilter(new AviaterRegexFilter("canal_tsdb\\..*"));
-        controller.setIsGTIDMode(true);
+        controller.setIsGTIDMode(false);
         controller.setEventSink(new AbstractCanalEventSinkTest<List<Entry>>() {
 
             public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
@@ -73,6 +75,7 @@ public class MysqlDumpTest {
                         System.out.println(" sql ----> " + rowChage.getSql());
                     }
 
+                    printXAInfo(rowChage.getPropsList());
                     for (RowData rowData : rowChage.getRowDatasList()) {
                         if (eventType == EventType.DELETE) {
                             print(rowData.getBeforeColumnsList());
@@ -119,4 +122,25 @@ public class MysqlDumpTest {
             System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
         }
     }
+
+    private void printXAInfo(List<Pair> pairs) {
+        if (pairs == null) {
+            return;
+        }
+
+        String xaType = null;
+        String xaXid = null;
+        for (Pair pair : pairs) {
+            String key = pair.getKey();
+            if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
+                xaType = pair.getValue();
+            } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
+                xaXid = pair.getValue();
+            }
+        }
+
+        if (xaType != null && xaXid != null) {
+            System.out.println(" ------> " + xaType + " " + xaXid);
+        }
+    }
 }

Rozdielové dáta súboru neboli zobrazené, pretože súbor je príliš veľký
+ 603 - 507
protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalEntry.java


+ 3 - 0
protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.proto

@@ -191,6 +191,9 @@ enum EventType {
     CINDEX		= 		10;
     DINDEX 		= 		11;
     GTID        =       12;
+    /** XA **/
+    XACOMMIT    =       13;
+    XAROLLBACK  =		14;
 }
 
 /**数据库类型**/

Niektoré súbory nie sú zobrazené, pretože je v týchto rozdielových dátach zmenené mnoho súborov