|
@@ -4,6 +4,7 @@ import java.io.IOException;
|
|
import java.util.BitSet;
|
|
import java.util.BitSet;
|
|
|
|
|
|
import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
|
|
import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
|
|
|
|
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
|
|
@@ -47,22 +48,22 @@ import com.taobao.tddl.dbsync.binlog.event.mariadb.StartEncryptionLogEvent;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Implements a binary-log decoder.
|
|
* Implements a binary-log decoder.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* <pre>
|
|
* <pre>
|
|
* LogDecoder decoder = new LogDecoder();
|
|
* LogDecoder decoder = new LogDecoder();
|
|
* decoder.handle(...);
|
|
* decoder.handle(...);
|
|
- *
|
|
|
|
|
|
+ *
|
|
* LogEvent event;
|
|
* LogEvent event;
|
|
* do
|
|
* do
|
|
* {
|
|
* {
|
|
* event = decoder.decode(buffer, context);
|
|
* event = decoder.decode(buffer, context);
|
|
- *
|
|
|
|
|
|
+ *
|
|
* // process log event.
|
|
* // process log event.
|
|
* }
|
|
* }
|
|
* while (event != null);
|
|
* while (event != null);
|
|
* // no more events in buffer.
|
|
* // no more events in buffer.
|
|
* </pre>
|
|
* </pre>
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
|
|
* @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
|
|
* @version 1.0
|
|
* @version 1.0
|
|
*/
|
|
*/
|
|
@@ -89,7 +90,7 @@ public final class LogDecoder {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Decoding an event from binary-log buffer.
|
|
* Decoding an event from binary-log buffer.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @return <code>UknownLogEvent</code> if event type is unknown or skipped,
|
|
* @return <code>UknownLogEvent</code> if event type is unknown or skipped,
|
|
* <code>null</code> if buffer is not including a full event.
|
|
* <code>null</code> if buffer is not including a full event.
|
|
*/
|
|
*/
|
|
@@ -141,7 +142,7 @@ public final class LogDecoder {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Deserialize an event from buffer.
|
|
* Deserialize an event from buffer.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @return <code>UknownLogEvent</code> if event type is unknown or skipped.
|
|
* @return <code>UknownLogEvent</code> if event type is unknown or skipped.
|
|
*/
|
|
*/
|
|
public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext context) throws IOException {
|
|
public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext context) throws IOException {
|
|
@@ -160,6 +161,7 @@ public final class LogDecoder {
|
|
// remove checksum bytes
|
|
// remove checksum bytes
|
|
buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
|
|
buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
|
|
}
|
|
}
|
|
|
|
+ GTIDSet gtidSet = context.getGtidSet();
|
|
switch (header.getType()) {
|
|
switch (header.getType()) {
|
|
case LogEvent.QUERY_EVENT: {
|
|
case LogEvent.QUERY_EVENT: {
|
|
QueryLogEvent event = new QueryLogEvent(header, buffer, descriptionEvent);
|
|
QueryLogEvent event = new QueryLogEvent(header, buffer, descriptionEvent);
|
|
@@ -373,8 +375,11 @@ public final class LogDecoder {
|
|
GtidLogEvent event = new GtidLogEvent(header, buffer, descriptionEvent);
|
|
GtidLogEvent event = new GtidLogEvent(header, buffer, descriptionEvent);
|
|
/* updating position in context */
|
|
/* updating position in context */
|
|
logPosition.position = header.getLogPos();
|
|
logPosition.position = header.getLogPos();
|
|
- // update latest gtid
|
|
|
|
- context.putGtid(event);
|
|
|
|
|
|
+ if (gtidSet != null) {
|
|
|
|
+ gtidSet.update(event.getGtidStr());
|
|
|
|
+ // update latest gtid
|
|
|
|
+ header.putGtidStr(gtidSet);
|
|
|
|
+ }
|
|
return event;
|
|
return event;
|
|
}
|
|
}
|
|
case LogEvent.PREVIOUS_GTIDS_LOG_EVENT: {
|
|
case LogEvent.PREVIOUS_GTIDS_LOG_EVENT: {
|