|
@@ -42,19 +42,19 @@ import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
|
|
|
|
|
|
public class MysqlConnection implements ErosaConnection {
|
|
public class MysqlConnection implements ErosaConnection {
|
|
|
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);
|
|
|
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);
|
|
|
|
|
|
private MysqlConnector connector;
|
|
private MysqlConnector connector;
|
|
private long slaveId;
|
|
private long slaveId;
|
|
- private Charset charset = Charset.forName("UTF-8");
|
|
|
|
|
|
+ private Charset charset = Charset.forName("UTF-8");
|
|
private BinlogFormat binlogFormat;
|
|
private BinlogFormat binlogFormat;
|
|
private BinlogImage binlogImage;
|
|
private BinlogImage binlogImage;
|
|
|
|
|
|
// tsdb releated
|
|
// tsdb releated
|
|
private AuthenticationInfo authInfo;
|
|
private AuthenticationInfo authInfo;
|
|
- protected int connTimeout = 5 * 1000; // 5秒
|
|
|
|
- protected int soTimeout = 60 * 60 * 1000; // 1小时
|
|
|
|
- private int binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
|
|
|
|
|
|
+ protected int connTimeout = 5 * 1000; // 5秒
|
|
|
|
+ protected int soTimeout = 60 * 60 * 1000; // 1小时
|
|
|
|
+ private int binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
|
|
// dump binlog bytes, 暂不包括meta与TSDB
|
|
// dump binlog bytes, 暂不包括meta与TSDB
|
|
private AtomicLong receivedBinlogBytes;
|
|
private AtomicLong receivedBinlogBytes;
|
|
private boolean compatiablePercona = false;
|
|
private boolean compatiablePercona = false;
|
|
@@ -223,7 +223,7 @@ public class MysqlConnection implements ErosaConnection {
|
|
List<LogEvent> iterateEvents = decoder.processIterateDecode(event, context);
|
|
List<LogEvent> iterateEvents = decoder.processIterateDecode(event, context);
|
|
if (!iterateEvents.isEmpty()) {
|
|
if (!iterateEvents.isEmpty()) {
|
|
// 处理compress event
|
|
// 处理compress event
|
|
- for(LogEvent itEvent : iterateEvents) {
|
|
|
|
|
|
+ for (LogEvent itEvent : iterateEvents) {
|
|
if (!func.sink(event)) {
|
|
if (!func.sink(event)) {
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
@@ -480,13 +480,13 @@ public class MysqlConnection implements ErosaConnection {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * MASTER_HEARTBEAT_PERIOD sets the interval in seconds between
|
|
|
|
- * replication heartbeats. Whenever the master's binary log is updated
|
|
|
|
- * with an event, the waiting period for the next heartbeat is reset.
|
|
|
|
- * interval is a decimal value having the range 0 to 4294967 seconds and
|
|
|
|
- * a resolution in milliseconds; the smallest nonzero value is 0.001.
|
|
|
|
- * Heartbeats are sent by the master only if there are no unsent events
|
|
|
|
- * in the binary log file for a period longer than interval.
|
|
|
|
|
|
+ * MASTER_HEARTBEAT_PERIOD sets the interval in seconds between replication
|
|
|
|
+ * heartbeats. Whenever the master's binary log is updated with an event, the
|
|
|
|
+ * waiting period for the next heartbeat is reset. interval is a decimal value
|
|
|
|
+ * having the range 0 to 4294967 seconds and a resolution in milliseconds; the
|
|
|
|
+ * smallest nonzero value is 0.001. Heartbeats are sent by the master only if
|
|
|
|
+ * there are no unsent events in the binary log file for a period longer than
|
|
|
|
+ * interval.
|
|
*/
|
|
*/
|
|
try {
|
|
try {
|
|
long periodNano = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS);
|
|
long periodNano = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS);
|
|
@@ -509,7 +509,8 @@ public class MysqlConnection implements ErosaConnection {
|
|
|
|
|
|
List<String> columnValues = rs.getFieldValues();
|
|
List<String> columnValues = rs.getFieldValues();
|
|
if (columnValues == null || columnValues.size() != 2) {
|
|
if (columnValues == null || columnValues.size() != 2) {
|
|
- logger.warn("unexpected binlog format query result, this may cause unexpected result, so throw exception to request network to io shutdown.");
|
|
|
|
|
|
+ logger.warn(
|
|
|
|
+ "unexpected binlog format query result, this may cause unexpected result, so throw exception to request network to io shutdown.");
|
|
throw new IllegalStateException("unexpected binlog format query result:" + rs.getFieldValues());
|
|
throw new IllegalStateException("unexpected binlog format query result:" + rs.getFieldValues());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -577,11 +578,10 @@ public class MysqlConnection implements ErosaConnection {
|
|
try {
|
|
try {
|
|
rs = query("select @@version_comment");
|
|
rs = query("select @@version_comment");
|
|
List<String> columnValues = rs.getFieldValues();
|
|
List<String> columnValues = rs.getFieldValues();
|
|
- if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0) != null){
|
|
|
|
- logger.warn("--> loadVersionComment(), select @@version_comment : " + columnValues.get(0));
|
|
|
|
- if(StringUtils.containsIgnoreCase(columnValues.get(0),"Percona")){
|
|
|
|
|
|
+ if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0) != null) {
|
|
|
|
+ logger.warn("load MySQL @@version_comment : " + columnValues.get(0));
|
|
|
|
+ if (StringUtils.containsIgnoreCase(columnValues.get(0), "Percona")) {
|
|
compatiablePercona = true;
|
|
compatiablePercona = true;
|
|
- logger.warn("--> loadVersionComment(), set compatiablePercona = true");
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|