|
@@ -2,11 +2,9 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
-import java.nio.ByteBuffer;
|
|
|
import java.nio.charset.Charset;
|
|
|
import java.util.List;
|
|
|
|
|
|
-import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.commons.lang.exception.ExceptionUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -25,8 +23,6 @@ import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
|
|
|
import com.taobao.tddl.dbsync.binlog.LogContext;
|
|
|
import com.taobao.tddl.dbsync.binlog.LogDecoder;
|
|
|
import com.taobao.tddl.dbsync.binlog.LogEvent;
|
|
|
-import com.taobao.tddl.dbsync.binlog.LogPosition;
|
|
|
-import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
|
|
|
|
|
|
public class MysqlConnection implements ErosaConnection {
|
|
|
|
|
@@ -37,7 +33,6 @@ public class MysqlConnection implements ErosaConnection {
|
|
|
private Charset charset = Charset.forName("UTF-8");
|
|
|
private BinlogFormat binlogFormat;
|
|
|
private BinlogImage binlogImage;
|
|
|
- private int binlogChecksum;
|
|
|
|
|
|
public MysqlConnection(){
|
|
|
}
|
|
@@ -83,7 +78,7 @@ public class MysqlConnection implements ErosaConnection {
|
|
|
*/
|
|
|
public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
|
|
|
updateSettings();
|
|
|
- loadBinlogChecksum();
|
|
|
+
|
|
|
sendBinlogDump(binlogfilename, binlogPosition);
|
|
|
DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
|
|
|
fetcher.start(connector.getChannel());
|
|
@@ -93,8 +88,6 @@ public class MysqlConnection implements ErosaConnection {
|
|
|
decoder.handle(LogEvent.QUERY_EVENT);
|
|
|
decoder.handle(LogEvent.XID_EVENT);
|
|
|
LogContext context = new LogContext();
|
|
|
- context.setLogPosition(new LogPosition(binlogfilename));
|
|
|
- context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
|
|
|
while (fetcher.fetch()) {
|
|
|
LogEvent event = null;
|
|
|
event = decoder.decode(fetcher, context);
|
|
@@ -111,14 +104,11 @@ public class MysqlConnection implements ErosaConnection {
|
|
|
|
|
|
public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
|
|
|
updateSettings();
|
|
|
- loadBinlogChecksum();
|
|
|
sendBinlogDump(binlogfilename, binlogPosition);
|
|
|
DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
|
|
|
fetcher.start(connector.getChannel());
|
|
|
LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
|
|
|
LogContext context = new LogContext();
|
|
|
- context.setLogPosition(new LogPosition(binlogfilename));
|
|
|
- context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
|
|
|
while (fetcher.fetch()) {
|
|
|
LogEvent event = null;
|
|
|
event = decoder.decode(fetcher, context);
|
|
@@ -148,8 +138,7 @@ public class MysqlConnection implements ErosaConnection {
|
|
|
HeaderPacket binlogDumpHeader = new HeaderPacket();
|
|
|
binlogDumpHeader.setPacketBodyLength(cmdBody.length);
|
|
|
binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
|
|
|
- PacketManager.write(connector.getChannel(), binlogDumpHeader.toBytes(),
|
|
|
- cmdBody);
|
|
|
+ PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(),cmdBody);
|
|
|
|
|
|
connector.setDumping(true);
|
|
|
}
|
|
@@ -205,12 +194,9 @@ public class MysqlConnection implements ErosaConnection {
|
|
|
// 如果不设置会出现错误: Slave can not handle replication events with the
|
|
|
// checksum that master is configured to log
|
|
|
// 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码
|
|
|
- // '@@global.binlog_checksum'需要去掉单引号,在mysql 5.6.29下导致master退出
|
|
|
- update("set @master_binlog_checksum= @@global.binlog_checksum");
|
|
|
+ update("set @master_binlog_checksum= '@@global.binlog_checksum'");
|
|
|
} catch (Exception e) {
|
|
|
- if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
|
|
|
- logger.warn(ExceptionUtils.getFullStackTrace(e));
|
|
|
- }
|
|
|
+ logger.warn(ExceptionUtils.getFullStackTrace(e));
|
|
|
}
|
|
|
|
|
|
try {
|
|
@@ -268,28 +254,6 @@ public class MysqlConnection implements ErosaConnection {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 获取主库checksum信息
|
|
|
- * https://dev.mysql.com/doc/refman/5.6/en/replication-options
|
|
|
- * -binary-log.html#option_mysqld_binlog-checksum
|
|
|
- */
|
|
|
- private void loadBinlogChecksum() {
|
|
|
- ResultSetPacket rs = null;
|
|
|
- try {
|
|
|
- rs = query("select @master_binlog_checksum");
|
|
|
- } catch (IOException e) {
|
|
|
- throw new CanalParseException(e);
|
|
|
- }
|
|
|
-
|
|
|
- List<String> columnValues = rs.getFieldValues();
|
|
|
- if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0) != null
|
|
|
- && columnValues.get(0).toUpperCase().equals("CRC32")) {
|
|
|
- binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
|
|
|
- } else {
|
|
|
- binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
public static enum BinlogFormat {
|
|
|
|
|
|
STATEMENT("STATEMENT"), ROW("ROW"), MIXED("MIXED");
|