Browse Source

Merge pull request #208 from jianan789/master

mysql5.6.29'@@global.binlog_checksum'带单引号导致master退出,优化binlogFormat判断
agapple 8 years ago
parent
commit
69a7e6d083

+ 7 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/FormatDescriptionLogEvent.java

@@ -140,10 +140,15 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
                 throw new IOException("Unknown binlog version: " + binlogVersion);
         }
     }
-
+    
+    public FormatDescriptionLogEvent(final int binlogVersion,int binlogChecksum){
+    	this(binlogVersion);
+    	this.header.checksumAlg = binlogChecksum;
+    }
+    
     public FormatDescriptionLogEvent(final int binlogVersion){
         this.binlogVersion = binlogVersion;
-
+        
         postHeaderLen = new short[ENUM_END_EVENT];
         /* identify binlog format */
         switch (binlogVersion) {

+ 1 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RotateLogEvent.java

@@ -90,6 +90,7 @@ public final class RotateLogEvent extends LogEvent {
         int filenameLen = buffer.limit() - filenameOffset;
         if (filenameLen > FN_REFLEN - 1) filenameLen = FN_REFLEN - 1;
         buffer.position(filenameOffset);
+        
         filename = buffer.getFixString(filenameLen);
     }
 

+ 0 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -325,7 +325,6 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
             startTs = System.currentTimeMillis();
         }
         CanalEntry.Entry event = binlogParser.parse(bod);
-
         if (enabled) {
             this.parsingInterval = System.currentTimeMillis() - startTs;
         }

+ 27 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -7,6 +7,8 @@ import java.nio.charset.Charset;
 import java.util.List;
 
 import com.taobao.tddl.dbsync.binlog.LogPosition;
+import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
+
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +37,8 @@ public class MysqlConnection implements ErosaConnection {
     private Charset             charset = Charset.forName("UTF-8");
     private BinlogFormat        binlogFormat;
     private BinlogImage         binlogImage;
-
+    private	int 				binlogChecksum;
+    
     public MysqlConnection(){
     }
 
@@ -107,12 +110,14 @@ public class MysqlConnection implements ErosaConnection {
 
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         updateSettings();
+        loadBinlogChecksum();
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
         LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
         LogContext context = new LogContext();
         context.setLogPosition(new LogPosition(binlogfilename));
+        context.setFormatDescription(new FormatDescriptionLogEvent(4,binlogChecksum));
         while (fetcher.fetch()) {
             LogEvent event = null;
             event = decoder.decode(fetcher, context);
@@ -199,7 +204,8 @@ public class MysqlConnection implements ErosaConnection {
             // 如果不设置会出现错误: Slave can not handle replication events with the
             // checksum that master is configured to log
             // 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码
-            update("set @master_binlog_checksum= '@@global.binlog_checksum'");
+        	// '@@global.binlog_checksum'需要去掉单引号,在mysql 5.6.29下导致master退出
+            update("set @master_binlog_checksum= @@global.binlog_checksum");
         } catch (Exception e) {
             logger.warn(ExceptionUtils.getFullStackTrace(e));
         }
@@ -258,7 +264,26 @@ public class MysqlConnection implements ErosaConnection {
             throw new IllegalStateException("unexpected binlog image query result:" + rs.getFieldValues());
         }
     }
+    /**
+     * 获取主库checksum信息
+     * https://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#option_mysqld_binlog-checksum
+     */
+    private void loadBinlogChecksum(){
+    	ResultSetPacket rs = null;
+        try {
+            rs = query("select @master_binlog_checksum");
+        } catch (IOException e) {
+            throw new CanalParseException(e);
+        }
 
+        List<String> columnValues = rs.getFieldValues();
+        if(columnValues.get(0).toUpperCase().equals("CRC32")){
+        	binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
+        }else{
+        	binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
+        }
+    }
+    
     public static enum BinlogFormat {
 
         STATEMENT("STATEMENT"), ROW("ROW"), MIXED("MIXED");

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -93,6 +93,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 for (BinlogFormat supportFormat : supportBinlogFormats) {
                     if (supportFormat != null && format == supportFormat) {
                         found = true;
+                        break;
                     }
                 }
 
@@ -107,6 +108,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 for (BinlogImage supportImage : supportBinlogImages) {
                     if (supportImage != null && image == supportImage) {
                         found = true;
+                        break;
                     }
                 }