Browse Source

fixed issue #1081 , support mariadb rotate_event support checksum

七锋 6 years ago
parent
commit
3bb408194d

+ 62 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -37,21 +37,23 @@ import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
 
 public class MysqlConnection implements ErosaConnection {
 
-    private static final Logger logger      = LoggerFactory.getLogger(MysqlConnection.class);
+    private static final Logger logger         = LoggerFactory.getLogger(MysqlConnection.class);
 
     private MysqlConnector      connector;
     private long                slaveId;
-    private Charset             charset     = Charset.forName("UTF-8");
+    private Charset             charset        = Charset.forName("UTF-8");
     private BinlogFormat        binlogFormat;
     private BinlogImage         binlogImage;
 
     // tsdb releated
     private AuthenticationInfo  authInfo;
-    protected int               connTimeout = 5 * 1000;                                      // 5秒
-    protected int               soTimeout   = 60 * 60 * 1000;                                // 1小时
+    protected int               connTimeout    = 5 * 1000;                                      // 5秒
+    protected int               soTimeout      = 60 * 60 * 1000;                                // 1小时
+    private int                 binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
     // dump binlog bytes, 暂不包括meta与TSDB
     private AtomicLong          receivedBinlogBytes;
 
@@ -118,7 +120,7 @@ public class MysqlConnection implements ErosaConnection {
      */
     public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         updateSettings();
-
+        loadBinlogChecksum();
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
@@ -128,6 +130,7 @@ public class MysqlConnection implements ErosaConnection {
         decoder.handle(LogEvent.QUERY_EVENT);
         decoder.handle(LogEvent.XID_EVENT);
         LogContext context = new LogContext();
+        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
             accumulateReceivedBytes(fetcher.limit());
             LogEvent event = null;
@@ -145,12 +148,14 @@ public class MysqlConnection implements ErosaConnection {
 
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         updateSettings();
+        loadBinlogChecksum();
         sendRegisterSlave();
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
         LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
         LogContext context = new LogContext();
+        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
             accumulateReceivedBytes(fetcher.limit());
             LogEvent event = null;
@@ -173,6 +178,7 @@ public class MysqlConnection implements ErosaConnection {
     @Override
     public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException {
         updateSettings();
+        loadBinlogChecksum();
         sendBinlogDumpGTID(gtidSet);
 
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
@@ -180,6 +186,7 @@ public class MysqlConnection implements ErosaConnection {
             fetcher.start(connector.getChannel());
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
+            context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
             // fix bug: #890 将gtid传输至context中,供decode使用
             context.setGtidSet(gtidSet);
             while (fetcher.fetch()) {
@@ -207,9 +214,11 @@ public class MysqlConnection implements ErosaConnection {
     @Override
     public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException {
         updateSettings();
+        loadBinlogChecksum();
         sendRegisterSlave();
         sendBinlogDump(binlogfilename, binlogPosition);
         ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
+        ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         try {
             fetcher.start(connector.getChannel());
@@ -234,9 +243,10 @@ public class MysqlConnection implements ErosaConnection {
     @Override
     public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException {
         updateSettings();
+        loadBinlogChecksum();
         sendBinlogDumpGTID(gtidSet);
-
         ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
+        ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         try {
             fetcher.start(connector.getChannel());
@@ -482,6 +492,52 @@ public class MysqlConnection implements ErosaConnection {
         }
     }
 
+    /**
+     * 获取主库checksum信息
+     * 
+     * <pre>
+     * mariadb区别于mysql会在binlog的第一个事件Rotate_Event里也会采用checksum逻辑,而mysql是在第二个binlog事件之后才感知是否需要处理checksum
+     * 导致maraidb只要是开启checksum就会出现binlog文件名解析乱码
+     * fixed issue : https://github.com/alibaba/canal/issues/1081
+     * </pre>
+     */
+    private void loadBinlogChecksum() {
+        if (checkMariaDB()) {
+            ResultSetPacket rs = null;
+            try {
+                rs = query("select @@global.binlog_checksum");
+            } catch (IOException e) {
+                throw new CanalParseException(e);
+            }
+
+            List<String> columnValues = rs.getFieldValues();
+            if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
+                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
+            } else {
+                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
+            }
+        }
+    }
+
+    /**
+     * 获取是否为mariadb
+     */
+    private boolean checkMariaDB() {
+        ResultSetPacket rs = null;
+        try {
+            rs = query("SELECT @@version");
+        } catch (IOException e) {
+            throw new CanalParseException(e);
+        }
+
+        List<String> columnValues = rs.getFieldValues();
+        if (columnValues != null && columnValues.size() >= 1) {
+            return StringUtils.containsIgnoreCase(columnValues.get(0), "MariaDB");
+        }
+
+        return false;
+    }
+
     private void accumulateReceivedBytes(long x) {
         if (receivedBinlogBytes != null) {
             receivedBinlogBytes.addAndGet(x);

+ 13 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -33,6 +33,7 @@ import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
@@ -69,6 +70,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     private WorkerPool<MessageEvent>          workerPool;
     private BatchEventProcessor<MessageEvent> simpleParserStage;
     private BatchEventProcessor<MessageEvent> sinkStoreStage;
+    private LogContext                        logContext;
 
     public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
                                       EventTransactionBuffer transactionBuffer, String destination){
@@ -95,9 +97,10 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         SequenceBarrier sequenceBarrier = disruptorMsgBuffer.newBarrier();
         ExceptionHandler exceptionHandler = new SimpleFatalExceptionHandler();
         // stage 2
+        this.logContext = new LogContext();
         simpleParserStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
             sequenceBarrier,
-            new SimpleParserStage());
+            new SimpleParserStage(logContext));
         simpleParserStage.setExceptionHandler(exceptionHandler);
         disruptorMsgBuffer.addGatingSequences(simpleParserStage.getSequence());
 
@@ -128,6 +131,12 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         workerPool.start(parserExecutor);
     }
 
+    public void setBinlogChecksum(int binlogChecksum) {
+        if (binlogChecksum != LogEvent.BINLOG_CHECKSUM_ALG_OFF) {
+            logContext.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
+        }
+    }
+
     @Override
     public void stop() {
         // fix bug #968,对于pool与
@@ -239,9 +248,9 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         private LogDecoder decoder;
         private LogContext context;
 
-        public SimpleParserStage(){
+        public SimpleParserStage(LogContext context){
             decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
-            context = new LogContext();
+            this.context = context;
             if (gtidSet != null) {
                 context.setGtidSet(gtidSet);
             }
@@ -468,4 +477,5 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     public void setGtidSet(GTIDSet gtidSet) {
         this.gtidSet = gtidSet;
     }
+
 }

+ 48 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.parse;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
 import org.junit.Assert;
@@ -10,21 +11,27 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
+import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.RegisterSlaveCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
 
 public class DirectLogFetcherTest {
 
     protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private boolean        isMariaDB;
+    private int            binlogChecksum;
 
     @Test
     public void testSimple() {
@@ -33,6 +40,7 @@ public class DirectLogFetcherTest {
             MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "xxxx", "xxxx");
             connector.connect();
             updateSettings(connector);
+            loadBinlogChecksum(connector);
             sendRegisterSlave(connector, 3);
             sendBinlogDump(connector, "mysql-bin.000001", 4L, 3);
 
@@ -40,6 +48,8 @@ public class DirectLogFetcherTest {
 
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
+            context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
+
             while (fetcher.fetch()) {
                 LogEvent event = null;
                 event = decoder.decode(fetcher, context);
@@ -188,6 +198,44 @@ public class DirectLogFetcherTest {
         }
     }
 
+    private void loadBinlogChecksum(MysqlConnector connector) {
+        checkMariaDB(connector);
+        if (isMariaDB) {
+            ResultSetPacket rs = null;
+            try {
+                rs = query("select @@global.binlog_checksum", connector);
+            } catch (IOException e) {
+                throw new CanalParseException(e);
+            }
+
+            List<String> columnValues = rs.getFieldValues();
+            if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
+                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
+            } else {
+                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
+            }
+        }
+    }
+
+    private void checkMariaDB(MysqlConnector connector) {
+        ResultSetPacket rs = null;
+        try {
+            rs = query("SELECT @@version", connector);
+        } catch (IOException e) {
+            throw new CanalParseException(e);
+        }
+
+        List<String> columnValues = rs.getFieldValues();
+        if (columnValues != null && columnValues.size() >= 1) {
+            isMariaDB = StringUtils.containsIgnoreCase(columnValues.get(0), "MariaDB");
+        }
+    }
+
+    public ResultSetPacket query(String cmd, MysqlConnector connector) throws IOException {
+        MysqlQueryExecutor exector = new MysqlQueryExecutor(connector);
+        return exector.query(cmd);
+    }
+
     public void update(String cmd, MysqlConnector connector) throws IOException {
         MysqlUpdateExecutor exector = new MysqlUpdateExecutor(connector);
         exector.update(cmd);