Browse Source

Merge remote-tracking branch 'upstream/master'

wuwo 6 years ago
parent
commit
5d5aafb7cd

+ 2 - 2
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -62,10 +62,10 @@ public class HbaseAdapter implements OuterAdapter {
                 }
             }
 
-            Map<String, String> propertites = configuration.getProperties();
+            Map<String, String> properties = configuration.getProperties();
 
             Configuration hbaseConfig = HBaseConfiguration.create();
-            propertites.forEach(hbaseConfig::set);
+            properties.forEach(hbaseConfig::set);
             conn = ConnectionFactory.createConnection(hbaseConfig);
             hbaseTemplate = new HbaseTemplate(conn);
             hbaseSyncService = new HbaseSyncService(hbaseTemplate);

+ 8 - 0
client-adapter/launcher/pom.xml

@@ -166,6 +166,14 @@
                 </executions>
             </plugin>
         </plugins>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <excludes>
+                    <exclude>application.yml</exclude>
+                </excludes>
+            </resource>
+        </resources>
     </build>
 
     <profiles>

+ 2 - 2
deployer/src/main/resources/mq.yml

@@ -1,4 +1,4 @@
-servers: localhost:9876 #for rocketmq: means the nameserver
+servers: slave1:6667 #for rocketmq: means the nameserver
 retries: 0
 batchSize: 16384
 lingerMs: 1
@@ -13,7 +13,7 @@ flatMessage: true
 canalDestinations:
   - canalDestination: example
     topic: example
-    partition: 1
+    partition:
 #  #对应topic分区数量
 #  partitionsNum: 3
 #  partitionHash:

+ 5 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -353,7 +353,11 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         eventSink.interrupt();
 
         if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
-            multiStageCoprocessor.stop();
+            try {
+                multiStageCoprocessor.stop();
+            } catch (Throwable t) {
+                logger.debug("multi processor rejected:", t);
+            }
         }
 
         try {

+ 62 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -37,21 +37,23 @@ import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
 
 public class MysqlConnection implements ErosaConnection {
 
-    private static final Logger logger      = LoggerFactory.getLogger(MysqlConnection.class);
+    private static final Logger logger         = LoggerFactory.getLogger(MysqlConnection.class);
 
     private MysqlConnector      connector;
     private long                slaveId;
-    private Charset             charset     = Charset.forName("UTF-8");
+    private Charset             charset        = Charset.forName("UTF-8");
     private BinlogFormat        binlogFormat;
     private BinlogImage         binlogImage;
 
     // tsdb releated
     private AuthenticationInfo  authInfo;
-    protected int               connTimeout = 5 * 1000;                                      // 5秒
-    protected int               soTimeout   = 60 * 60 * 1000;                                // 1小时
+    protected int               connTimeout    = 5 * 1000;                                      // 5秒
+    protected int               soTimeout      = 60 * 60 * 1000;                                // 1小时
+    private int                 binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
     // dump binlog bytes, 暂不包括meta与TSDB
     private AtomicLong          receivedBinlogBytes;
 
@@ -118,7 +120,7 @@ public class MysqlConnection implements ErosaConnection {
      */
     public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         updateSettings();
-
+        loadBinlogChecksum();
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
@@ -128,6 +130,7 @@ public class MysqlConnection implements ErosaConnection {
         decoder.handle(LogEvent.QUERY_EVENT);
         decoder.handle(LogEvent.XID_EVENT);
         LogContext context = new LogContext();
+        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
             accumulateReceivedBytes(fetcher.limit());
             LogEvent event = null;
@@ -145,12 +148,14 @@ public class MysqlConnection implements ErosaConnection {
 
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         updateSettings();
+        loadBinlogChecksum();
         sendRegisterSlave();
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
         LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
         LogContext context = new LogContext();
+        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
             accumulateReceivedBytes(fetcher.limit());
             LogEvent event = null;
@@ -173,6 +178,7 @@ public class MysqlConnection implements ErosaConnection {
     @Override
     public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException {
         updateSettings();
+        loadBinlogChecksum();
         sendBinlogDumpGTID(gtidSet);
 
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
@@ -180,6 +186,7 @@ public class MysqlConnection implements ErosaConnection {
             fetcher.start(connector.getChannel());
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
+            context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
             // fix bug: #890 将gtid传输至context中,供decode使用
             context.setGtidSet(gtidSet);
             while (fetcher.fetch()) {
@@ -207,9 +214,11 @@ public class MysqlConnection implements ErosaConnection {
     @Override
     public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException {
         updateSettings();
+        loadBinlogChecksum();
         sendRegisterSlave();
         sendBinlogDump(binlogfilename, binlogPosition);
         ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
+        ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         try {
             fetcher.start(connector.getChannel());
@@ -234,9 +243,10 @@ public class MysqlConnection implements ErosaConnection {
     @Override
     public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException {
         updateSettings();
+        loadBinlogChecksum();
         sendBinlogDumpGTID(gtidSet);
-
         ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
+        ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         try {
             fetcher.start(connector.getChannel());
@@ -482,6 +492,52 @@ public class MysqlConnection implements ErosaConnection {
         }
     }
 
+    /**
+     * 获取主库checksum信息
+     * 
+     * <pre>
+     * mariadb区别于mysql会在binlog的第一个事件Rotate_Event里也会采用checksum逻辑,而mysql是在第二个binlog事件之后才感知是否需要处理checksum
+     * 导致maraidb只要是开启checksum就会出现binlog文件名解析乱码
+     * fixed issue : https://github.com/alibaba/canal/issues/1081
+     * </pre>
+     */
+    private void loadBinlogChecksum() {
+        if (checkMariaDB()) {
+            ResultSetPacket rs = null;
+            try {
+                rs = query("select @@global.binlog_checksum");
+            } catch (IOException e) {
+                throw new CanalParseException(e);
+            }
+
+            List<String> columnValues = rs.getFieldValues();
+            if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
+                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
+            } else {
+                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
+            }
+        }
+    }
+
+    /**
+     * 获取是否为mariadb
+     */
+    private boolean checkMariaDB() {
+        ResultSetPacket rs = null;
+        try {
+            rs = query("SELECT @@version");
+        } catch (IOException e) {
+            throw new CanalParseException(e);
+        }
+
+        List<String> columnValues = rs.getFieldValues();
+        if (columnValues != null && columnValues.size() >= 1) {
+            return StringUtils.containsIgnoreCase(columnValues.get(0), "MariaDB");
+        }
+
+        return false;
+    }
+
     private void accumulateReceivedBytes(long x) {
         if (receivedBinlogBytes != null) {
             receivedBinlogBytes.addAndGet(x);

+ 27 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -69,6 +69,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     // update by yishun.chen,特殊异常处理参数
     private int                  dumpErrorCount                    = 0;        // binlogDump失败异常计数
     private int                  dumpErrorCountThreshold           = 2;        // binlogDump失败异常计数阀值
+    private boolean              rdsOssMode                        = false;
 
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
@@ -352,7 +353,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 return logPosition.getPostion();
             }
 
-            if (masterPosition!=null && StringUtils.isNotEmpty(masterPosition.getGtid())) {
+            if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) {
                 return masterPosition;
             }
         }
@@ -493,6 +494,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         dumpErrorCount = 0;
                         return findPosition;
                     }
+
+                    Long timestamp = logPosition.getPostion().getTimestamp();
+                    if (isRdsOssMode() && (timestamp != null && timestamp > 0)) {
+                        // 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理
+                        return null;
+                    }
                 }
                 // 其余情况
                 logger.warn("prepare to find start position just last position\n {}",
@@ -745,14 +752,13 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                             logPosition.setPostion(entryPosition);
                         }
 
-                        if (entry == null) {
-                            return true;
-                        }
-
-                        String logfilename = entry.getHeader().getLogfileName();
-                        Long logfileoffset = entry.getHeader().getLogfileOffset();
-                        Long logposTimestamp = entry.getHeader().getExecuteTime();
-                        Long serverId = entry.getHeader().getServerId();
+                        // 直接用event的位点来处理,解决一个binlog文件里没有任何事件导致死循环无法退出的问题
+                        String logfilename = event.getHeader().getLogFileName();
+                        // 记录的是binlog end offest,
+                        // 因为与其对比的offest是show master status里的end offest
+                        Long logfileoffset = event.getHeader().getLogPos();
+                        Long logposTimestamp = event.getHeader().getWhen() * 1000;
+                        Long serverId = event.getHeader().getServerId();
 
                         // 如果最小的一条记录都不满足条件,可直接退出
                         if (logposTimestamp >= startTimestamp) {
@@ -764,6 +770,10 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                             return false;
                         }
 
+                        if (entry == null) {
+                            return true;
+                        }
+
                         // 记录一下上一个事务结束的位置,即下一个事务的position
                         // position = current +
                         // data.length,代表该事务的下一条offest,避免多余的事务重复
@@ -905,4 +915,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         this.dumpErrorCountThreshold = dumpErrorCountThreshold;
     }
 
+    public boolean isRdsOssMode() {
+        return rdsOssMode;
+    }
+
+    public void setRdsOssMode(boolean rdsOssMode) {
+        this.rdsOssMode = rdsOssMode;
+    }
+
 }

+ 13 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -33,6 +33,7 @@ import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
@@ -69,6 +70,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     private WorkerPool<MessageEvent>          workerPool;
     private BatchEventProcessor<MessageEvent> simpleParserStage;
     private BatchEventProcessor<MessageEvent> sinkStoreStage;
+    private LogContext                        logContext;
 
     public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
                                       EventTransactionBuffer transactionBuffer, String destination){
@@ -95,9 +97,10 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         SequenceBarrier sequenceBarrier = disruptorMsgBuffer.newBarrier();
         ExceptionHandler exceptionHandler = new SimpleFatalExceptionHandler();
         // stage 2
+        this.logContext = new LogContext();
         simpleParserStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
             sequenceBarrier,
-            new SimpleParserStage());
+            new SimpleParserStage(logContext));
         simpleParserStage.setExceptionHandler(exceptionHandler);
         disruptorMsgBuffer.addGatingSequences(simpleParserStage.getSequence());
 
@@ -128,6 +131,12 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         workerPool.start(parserExecutor);
     }
 
+    public void setBinlogChecksum(int binlogChecksum) {
+        if (binlogChecksum != LogEvent.BINLOG_CHECKSUM_ALG_OFF) {
+            logContext.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
+        }
+    }
+
     @Override
     public void stop() {
         // fix bug #968,对于pool与
@@ -239,9 +248,9 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         private LogDecoder decoder;
         private LogContext context;
 
-        public SimpleParserStage(){
+        public SimpleParserStage(LogContext context){
             decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
-            context = new LogContext();
+            this.context = context;
             if (gtidSet != null) {
                 context.setGtidSet(gtidSet);
             }
@@ -468,4 +477,5 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     public void setGtidSet(GTIDSet gtidSet) {
         this.gtidSet = gtidSet;
     }
+
 }

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -567,8 +567,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     }
 
     private EntryPosition createPosition(LogHeader logHeader) {
-        return new EntryPosition(logHeader.getLogFileName(),
-            logHeader.getLogPos(),
+        return new EntryPosition(logHeader.getLogFileName(), logHeader.getLogPos() - logHeader.getEventLen(), // startPos
             logHeader.getWhen() * 1000L,
             logHeader.getServerId()); // 记录到秒
     }
@@ -818,6 +817,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         Header.Builder headerBuilder = Header.newBuilder();
         headerBuilder.setVersion(version);
         headerBuilder.setLogfileName(logHeader.getLogFileName());
+        // 记录的是该binlog的start offest
         headerBuilder.setLogfileOffset(logHeader.getLogPos() - logHeader.getEventLen());
         headerBuilder.setServerId(logHeader.getServerId());
         headerBuilder.setServerenCode(UTF_8);// 经过java输出后所有的编码为unicode

+ 13 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java

@@ -41,6 +41,7 @@ import org.apache.http.ssl.TrustStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
 
 /**
@@ -103,6 +104,9 @@ public class BinlogDownloadQueue {
 
     public BinlogFile tryOne() throws Throwable {
         BinlogFile binlogFile = binlogList.poll();
+        if (binlogFile == null) {
+            throw new CanalParseException("download binlog is null");
+        }
         download(binlogFile);
         hostId = binlogFile.getHostInstanceID();
         this.currentSize++;
@@ -131,7 +135,7 @@ public class BinlogDownloadQueue {
         if (StringUtils.isNotEmpty(needCompareName) && StringUtils.endsWith(needCompareName, "tar")) {
             needCompareName = needCompareName.substring(0, needCompareName.indexOf("."));
         }
-        return fileName.equalsIgnoreCase(needCompareName) && binlogList.isEmpty();
+        return (needCompareName == null || fileName.equalsIgnoreCase(needCompareName)) && binlogList.isEmpty();
     }
 
     public void prepare() throws InterruptedException {
@@ -162,6 +166,14 @@ public class BinlogDownloadQueue {
         this.currentSize = 0;
         binlogList.clear();
         downloadQueue.clear();
+        try {
+            downloadThread.interrupt();
+            downloadThread.join();// 等待其结束
+        } catch (InterruptedException e) {
+            // ignore
+        } finally {
+            downloadThread = null;
+        }
     }
 
     private void download(BinlogFile binlogFile) throws Throwable {

+ 20 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java

@@ -22,14 +22,14 @@ import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
  */
 public class RdsBinlogEventParserProxy extends MysqlEventParser {
 
-    private String                    rdsOpenApiUrl             = "https://rds.aliyuncs.com/";    // openapi地址
-    private String                    accesskey;                                                  // 云账号的ak
-    private String                    secretkey;                                                  // 云账号sk
-    private String                    instanceId;                                                 // rds实例id
-    private String                    directory;                                                  // binlog目录
-    private int                       batchFileSize             = 4;                              // 最多下载的binlog文件数量
-
-    private RdsLocalBinlogEventParser rdsLocalBinlogEventParser = new RdsLocalBinlogEventParser();
+    private String                    rdsOpenApiUrl             = "https://rds.aliyuncs.com/"; // openapi地址
+    private String                    accesskey;                                              // 云账号的ak
+    private String                    secretkey;                                              // 云账号sk
+    private String                    instanceId;                                             // rds实例id
+    private String                    directory;                                              // binlog目录
+    private int                       batchFileSize             = 4;                          // 最多下载的binlog文件数量
+
+    private RdsLocalBinlogEventParser rdsLocalBinlogEventParser = null;
     private ExecutorService           executorService           = Executors.newSingleThreadExecutor(new ThreadFactory() {
 
                                                                     @Override
@@ -43,8 +43,11 @@ public class RdsBinlogEventParserProxy extends MysqlEventParser {
 
     @Override
     public void start() {
-        if (StringUtils.isNotEmpty(accesskey) && StringUtils.isNotEmpty(secretkey)
+        if (rdsLocalBinlogEventParser == null && StringUtils.isNotEmpty(accesskey) && StringUtils.isNotEmpty(secretkey)
             && StringUtils.isNotEmpty(instanceId)) {
+            rdsLocalBinlogEventParser = new RdsLocalBinlogEventParser();
+            // rds oss mode
+            setRdsOssMode(true);
             final ParserExceptionHandler targetHandler = this.getParserExceptionHandler();
             if (directory == null) {
                 directory = System.getProperty("java.io.tmpdir", "/tmp") + "/" + destination;
@@ -119,10 +122,18 @@ public class RdsBinlogEventParserProxy extends MysqlEventParser {
                         long serverId = rdsBinlogEventParserProxy.getServerId();
                         rdsLocalBinlogEventParser.setServerId(serverId);
                         rdsBinlogEventParserProxy.stop();
+                    } catch (Throwable e) {
+                        logger.info("handle exception failed", e);
+                    }
+
+                    try {
                         logger.info("start rds mysql binlog parser!");
                         rdsLocalBinlogEventParser.start();
                     } catch (Throwable e) {
                         logger.info("handle exception failed", e);
+                        rdsLocalBinlogEventParser.stop();
+                        RdsBinlogEventParserProxy rdsBinlogEventParserProxy = RdsBinlogEventParserProxy.this;
+                        rdsBinlogEventParserProxy.start();// 继续重试
                     }
                 }
             });

+ 9 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java

@@ -57,7 +57,11 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
             if (entryPosition == null) {
                 throw new PositionNotFoundException("position not found!");
             }
-            long startTimeInMill = entryPosition.getTimestamp();
+            Long startTimeInMill = entryPosition.getTimestamp();
+            if (startTimeInMill == null || startTimeInMill <= 0) {
+                throw new PositionNotFoundException("position timestamp is empty!");
+            }
+
             startTime = startTimeInMill;
             List<BinlogFile> binlogFiles = RdsBinlogOpenApi.listBinlogFiles(url,
                 accesskey,
@@ -65,6 +69,10 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
                 instanceId,
                 new Date(startTime),
                 new Date(endTime));
+            if (binlogFiles.isEmpty()) {
+                throw new CanalParseException("start timestamp : " + startTimeInMill + " binlog files is empty");
+            }
+
             binlogDownloadQueue = new BinlogDownloadQueue(binlogFiles, batchFileSize, directory);
             binlogDownloadQueue.silenceDownload();
             needWait = true;

+ 48 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.parse;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
 import org.junit.Assert;
@@ -10,21 +11,27 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
+import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.RegisterSlaveCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
 
 public class DirectLogFetcherTest {
 
     protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private boolean        isMariaDB;
+    private int            binlogChecksum;
 
     @Test
     public void testSimple() {
@@ -33,6 +40,7 @@ public class DirectLogFetcherTest {
             MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "xxxx", "xxxx");
             connector.connect();
             updateSettings(connector);
+            loadBinlogChecksum(connector);
             sendRegisterSlave(connector, 3);
             sendBinlogDump(connector, "mysql-bin.000001", 4L, 3);
 
@@ -40,6 +48,8 @@ public class DirectLogFetcherTest {
 
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
+            context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
+
             while (fetcher.fetch()) {
                 LogEvent event = null;
                 event = decoder.decode(fetcher, context);
@@ -188,6 +198,44 @@ public class DirectLogFetcherTest {
         }
     }
 
+    private void loadBinlogChecksum(MysqlConnector connector) {
+        checkMariaDB(connector);
+        if (isMariaDB) {
+            ResultSetPacket rs = null;
+            try {
+                rs = query("select @@global.binlog_checksum", connector);
+            } catch (IOException e) {
+                throw new CanalParseException(e);
+            }
+
+            List<String> columnValues = rs.getFieldValues();
+            if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
+                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
+            } else {
+                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
+            }
+        }
+    }
+
+    private void checkMariaDB(MysqlConnector connector) {
+        ResultSetPacket rs = null;
+        try {
+            rs = query("SELECT @@version", connector);
+        } catch (IOException e) {
+            throw new CanalParseException(e);
+        }
+
+        List<String> columnValues = rs.getFieldValues();
+        if (columnValues != null && columnValues.size() >= 1) {
+            isMariaDB = StringUtils.containsIgnoreCase(columnValues.get(0), "MariaDB");
+        }
+    }
+
+    public ResultSetPacket query(String cmd, MysqlConnector connector) throws IOException {
+        MysqlQueryExecutor exector = new MysqlQueryExecutor(connector);
+        return exector.query(cmd);
+    }
+
     public void update(String cmd, MysqlConnector connector) throws IOException {
         MysqlUpdateExecutor exector = new MysqlUpdateExecutor(connector);
         exector.update(cmd);

+ 7 - 1
server/src/main/java/com/alibaba/otter/canal/common/CanalMessageSerializer.java

@@ -16,7 +16,7 @@ import com.google.protobuf.WireFormat;
 public class CanalMessageSerializer {
 
     @SuppressWarnings("deprecation")
-    public static byte[] serializer(Message data) {
+    public static byte[] serializer(Message data, boolean filterTransactionEntry) {
         try {
             if (data != null) {
                 if (data.getId() != -1) {
@@ -53,8 +53,14 @@ public class CanalMessageSerializer {
                         output.checkNoSpaceLeft();
                         return body;
                     } else if (!CollectionUtils.isEmpty(data.getEntries())) {
+                        // mq模式只会走到非rowEntry模式
                         CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
                         for (CanalEntry.Entry entry : data.getEntries()) {
+                            if (filterTransactionEntry
+                                && (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND)) {
+                                continue;
+                            }
+
                             messageBuilder.addMessages(entry.toByteString());
                         }
 

+ 9 - 1
server/src/main/java/com/alibaba/otter/canal/kafka/MessageSerializer.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.kafka;
 
 import java.util.Map;
 
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.kafka.common.serialization.Serializer;
 
 import com.alibaba.otter.canal.common.CanalMessageSerializer;
@@ -15,13 +16,20 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class MessageSerializer implements Serializer<Message> {
 
+    private boolean filterTransactionEntry = false;
+
+    public MessageSerializer(){
+        this.filterTransactionEntry = BooleanUtils.toBoolean(System.getProperty("canal.instance.filter.transaction.entry",
+            "false"));
+    }
+
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
     }
 
     @Override
     public byte[] serialize(String topic, Message data) {
-        return CanalMessageSerializer.serializer(data);
+        return CanalMessageSerializer.serializer(data, filterTransactionEntry);
     }
 
     @Override

+ 28 - 23
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -47,7 +47,8 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                      Callback callback) {
         if (!mqProperties.getFlatMessage()) {
             try {
-                Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data));
+                Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data,
+                    mqProperties.isFilterTransactionEntry()));
                 logger.debug("send message:{} to destination:{}, partition: {}",
                     message,
                     destination.getCanalDestination(),
@@ -77,8 +78,8 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                 JSON.toJSONString(flatMessage),
                                 destination.getTopic(),
                                 destination.getPartition());
-                            Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessage)
-                                .getBytes());
+                            Message message = new Message(destination.getTopic(),
+                                JSON.toJSONString(flatMessage).getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                 @Override
@@ -98,28 +99,32 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                             int length = partitionFlatMessage.length;
                             for (int i = 0; i < length; i++) {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
-                                logger.debug("flatMessagePart: {}, partition: {}",
-                                    JSON.toJSONString(flatMessagePart),
-                                    i);
-                                final int index = i;
-                                try {
-                                    Message message = new Message(destination.getTopic(),
-                                        JSON.toJSONString(flatMessagePart).getBytes());
-                                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
+                                if (flatMessagePart != null) {
+                                    logger.debug("flatMessagePart: {}, partition: {}",
+                                        JSON.toJSONString(flatMessagePart),
+                                        i);
+                                    final int index = i;
+                                    try {
+                                        Message message = new Message(destination.getTopic(),
+                                            JSON.toJSONString(flatMessagePart).getBytes());
+                                        this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
-                                        @Override
-                                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                                            if (index > mqs.size()) {
-                                                throw new CanalServerException("partition number is error,config num:"
-                                                                               + destination.getPartitionsNum()
-                                                                               + ", mq num: " + mqs.size());
+                                            @Override
+                                            public MessageQueue select(List<MessageQueue> mqs, Message msg,
+                                                                       Object arg) {
+                                                if (index > mqs.size()) {
+                                                    throw new CanalServerException(
+                                                        "partition number is error,config num:"
+                                                                                   + destination.getPartitionsNum()
+                                                                                   + ", mq num: " + mqs.size());
+                                                }
+                                                return mqs.get(index);
                                             }
-                                            return mqs.get(index);
-                                        }
-                                    }, null);
-                                } catch (Exception e) {
-                                    logger.error("send flat message to hashed partition error", e);
-                                    callback.rollback();
+                                        }, null);
+                                    } catch (Exception e) {
+                                        logger.error("send flat message to hashed partition error", e);
+                                        callback.rollback();
+                                    }
                                 }
                             }
                         }