Bläddra i källkod

1. Add MariaDB gtid support 2. fix issue:#561 增加一个外部参数控制,允许位点不存在时自动重置到当前位点 (#3025)

* fix issue:#561 增加一个外部参数控制,允许位点不存在时自动重置到当前位点

* Add MariaDB GTID support
文哥 4 år sedan
förälder
incheckning
de1dd70609
18 ändrade filer med 337 tillägg och 61 borttagningar
  1. 3 3
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java
  2. 21 28
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java
  3. 3 2
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/LogHeader.java
  4. 31 3
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/MariaGtidListLogEvent.java
  5. 11 8
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/MariaGtidLogEvent.java
  6. 3 0
      deployer/src/main/resources/canal.properties
  7. 2 0
      deployer/src/main/resources/spring/default-instance.xml
  8. 2 0
      deployer/src/main/resources/spring/file-instance.xml
  9. 4 0
      deployer/src/main/resources/spring/group-instance.xml
  10. 2 0
      deployer/src/main/resources/spring/memory-instance.xml
  11. 7 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java
  12. 62 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/MariaGTIDSet.java
  13. 77 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/MariaGtid.java
  14. 21 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/GtidUtil.java
  15. 10 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  16. 30 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  17. 23 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  18. 25 9
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

+ 3 - 3
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java

@@ -24,7 +24,7 @@ public final class LogContext {
 
     private GTIDSet                           gtidSet;
 
-    private GtidLogEvent                      gtidLogEvent; // save current gtid log event
+    private LogEvent                          gtidLogEvent; // save current gtid log event
 
     public LogContext(){
         this.formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;
@@ -75,11 +75,11 @@ public final class LogContext {
         this.gtidSet = gtidSet;
     }
 
-    public GtidLogEvent getGtidLogEvent() {
+    public LogEvent getGtidLogEvent() {
         return gtidLogEvent;
     }
 
-    public void setGtidLogEvent(GtidLogEvent gtidLogEvent) {
+    public void setGtidLogEvent(LogEvent gtidLogEvent) {
         this.gtidLogEvent = gtidLogEvent;
     }
 }

+ 21 - 28
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -162,7 +162,7 @@ public final class LogDecoder {
             buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
         }
         GTIDSet gtidSet = context.getGtidSet();
-        GtidLogEvent gtidLogEvent = context.getGtidLogEvent();
+        LogEvent gtidLogEvent = context.getGtidLogEvent();
         switch (header.getType()) {
             case LogEvent.QUERY_EVENT: {
                 QueryLogEvent event = new QueryLogEvent(header, buffer, descriptionEvent);
@@ -185,7 +185,8 @@ public final class LogDecoder {
                 context.putTable(mapEvent);
                 return mapEvent;
             }
-            case LogEvent.WRITE_ROWS_EVENT_V1: {
+            case LogEvent.WRITE_ROWS_EVENT_V1:
+            case LogEvent.WRITE_ROWS_EVENT: {
                 RowsLogEvent event = new WriteRowsLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
@@ -193,7 +194,8 @@ public final class LogDecoder {
                 header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
-            case LogEvent.UPDATE_ROWS_EVENT_V1: {
+            case LogEvent.UPDATE_ROWS_EVENT_V1:
+            case LogEvent.UPDATE_ROWS_EVENT: {
                 RowsLogEvent event = new UpdateRowsLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
@@ -201,7 +203,8 @@ public final class LogDecoder {
                 header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
-            case LogEvent.DELETE_ROWS_EVENT_V1: {
+            case LogEvent.DELETE_ROWS_EVENT_V1:
+            case LogEvent.DELETE_ROWS_EVENT: {
                 RowsLogEvent event = new DeleteRowsLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
@@ -349,30 +352,6 @@ public final class LogDecoder {
                 header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
-            case LogEvent.WRITE_ROWS_EVENT: {
-                RowsLogEvent event = new WriteRowsLogEvent(header, buffer, descriptionEvent);
-                /* updating position in context */
-                logPosition.position = header.getLogPos();
-                event.fillTable(context);
-                header.putGtid(context.getGtidSet(), gtidLogEvent);
-                return event;
-            }
-            case LogEvent.UPDATE_ROWS_EVENT: {
-                RowsLogEvent event = new UpdateRowsLogEvent(header, buffer, descriptionEvent);
-                /* updating position in context */
-                logPosition.position = header.getLogPos();
-                event.fillTable(context);
-                header.putGtid(context.getGtidSet(), gtidLogEvent);
-                return event;
-            }
-            case LogEvent.DELETE_ROWS_EVENT: {
-                RowsLogEvent event = new DeleteRowsLogEvent(header, buffer, descriptionEvent);
-                /* updating position in context */
-                logPosition.position = header.getLogPos();
-                event.fillTable(context);
-                header.putGtid(context.getGtidSet(), gtidLogEvent);
-                return event;
-            }
             case LogEvent.PARTIAL_UPDATE_ROWS_EVENT: {
                 RowsLogEvent event = new UpdateRowsLogEvent(header, buffer, descriptionEvent, true);
                 /* updating position in context */
@@ -436,12 +415,26 @@ public final class LogDecoder {
                 MariaGtidLogEvent event = new MariaGtidLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
+                if (gtidSet != null) {
+                    gtidSet.update(event.getGtidStr());
+                    // update latest gtid
+                    header.putGtid(gtidSet, event);
+                }
+                // update current gtid event to context
+                context.setGtidLogEvent(event);
                 return event;
             }
             case LogEvent.GTID_LIST_EVENT: {
                 MariaGtidListLogEvent event = new MariaGtidListLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
+                if (gtidSet != null) {
+                    gtidSet.update(event.getGtidStr());
+                    // update latest gtid
+                    header.putGtid(gtidSet, event);
+                }
+                // update current gtid event to context
+                context.setGtidLogEvent(event);
                 return event;
             }
             case LogEvent.START_ENCRYPTION_EVENT: {

+ 3 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/LogHeader.java

@@ -316,10 +316,11 @@ public final class LogHeader {
         return gtidMap.get(CURRENT_GTID_LAST_COMMIT);
     }
 
-    public void putGtid(GTIDSet gtidSet, GtidLogEvent event) {
+    public void putGtid(GTIDSet gtidSet, LogEvent gtidEvent) {
         if (gtidSet != null) {
             gtidMap.put(GTID_SET_STRING, gtidSet.toString());
-            if (event != null) {
+            if (gtidEvent != null && gtidEvent instanceof GtidLogEvent) {
+                GtidLogEvent event = (GtidLogEvent)gtidEvent;
                 gtidMap.put(CURRENT_GTID_STRING, event.getGtidStr());
                 gtidMap.put(CURRENT_GTID_SN, String.valueOf(event.getSequenceNumber()));
                 gtidMap.put(CURRENT_GTID_LAST_COMMIT, String.valueOf(event.getLastCommitted()));

+ 31 - 3
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/MariaGtidListLogEvent.java

@@ -1,6 +1,9 @@
 package com.taobao.tddl.dbsync.binlog.event.mariadb;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MariaGTIDSet;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MariaGtid;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.IgnorableLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.LogHeader;
@@ -11,11 +14,36 @@ import com.taobao.tddl.dbsync.binlog.event.LogHeader;
  * @author jianghang 2014-1-20 下午4:51:50
  * @since 1.0.17
  */
-public class MariaGtidListLogEvent extends IgnorableLogEvent {
+public class MariaGtidListLogEvent extends LogEvent {
 
+    private MariaGTIDSet mariaGTIDSet;
+    /**
+     * <pre>
+     * mariadb gtidListLog event format
+     *  uint<4> Number of GTIDs
+     *  GTID[0]
+     *      uint<4> Replication Domain ID
+     *      uint<4> Server_ID
+     *      uint<8> GTID sequence ...
+     * GTID[n]
+     * </pre>
+     * @param header
+     * @param buffer
+     * @param descriptionEvent
+     */
     public MariaGtidListLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
-        super(header, buffer, descriptionEvent);
-        // do nothing , just ignore log event
+        super(header);
+        long gtidLenth = buffer.getUint32();
+        mariaGTIDSet = new MariaGTIDSet();
+        for (int i = 0; i < gtidLenth; i++) {
+            long domainId = buffer.getUint32();
+            long serverId = buffer.getUint32();
+            long sequence = buffer.getUlong64().longValue();
+            mariaGTIDSet.add(new MariaGtid(domainId, serverId, sequence));
+        }
     }
 
+    public String getGtidStr() {
+        return mariaGTIDSet.toString();
+    }
 }

+ 11 - 8
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/MariaGtidLogEvent.java

@@ -1,6 +1,8 @@
 package com.taobao.tddl.dbsync.binlog.event.mariadb;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MariaGtid;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.IgnorableLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.LogHeader;
@@ -11,9 +13,9 @@ import com.taobao.tddl.dbsync.binlog.event.LogHeader;
  * @author jianghang 2014-1-20 下午4:49:10
  * @since 1.0.17
  */
-public class MariaGtidLogEvent extends IgnorableLogEvent {
+public class MariaGtidLogEvent extends LogEvent {
 
-    private long gtid;
+    private MariaGtid mariaGtid;
 
     /**
      * <pre>
@@ -30,13 +32,14 @@ public class MariaGtidLogEvent extends IgnorableLogEvent {
      */
 
     public MariaGtidLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
-        super(header, buffer, descriptionEvent);
-        gtid = buffer.getUlong64().longValue();
-        // do nothing , just ignore log event
+        super(header);
+        long sequence = buffer.getUlong64().longValue();
+        long domainId = buffer.getUint32();
+        long serverId = header.getServerId();
+        mariaGtid = new MariaGtid(domainId, serverId, sequence);
     }
 
-    public long getGtid() {
-        return gtid;
+    public String getGtidStr() {
+        return mariaGtid.toString();
     }
-
 }

+ 3 - 0
deployer/src/main/resources/canal.properties

@@ -95,6 +95,9 @@ canal.conf.dir = ../conf
 # auto scan instance dir add/remove and start/stop instance
 canal.auto.scan = true
 canal.auto.scan.interval = 5
+# set this value to 'true' means that when binlog pos not found, skip to latest.
+# WARN: pls keep 'false' in production env, or if you know what you want.
+canal.auto.reset.latest.pos.mode = false
 
 canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
 #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

+ 2 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -192,6 +192,8 @@
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
+
+		<property name="autoResetLatestPosMode" value="${canal.auto.reset.latest.pos.mode:false}" />
 	</bean>
 
 	<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">

+ 2 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -178,6 +178,8 @@
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
+
+		<property name="autoResetLatestPosMode" value="${canal.auto.reset.latest.pos.mode:false}" />
 	</bean>
 
 	<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">

+ 4 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -165,6 +165,8 @@
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
+
+		<property name="autoResetLatestPosMode" value="${canal.auto.reset.latest.pos.mode:false}" />
 	</bean>
 
 	<bean id="eventParser2" parent="baseEventParser">
@@ -268,6 +270,8 @@
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
+
+		<property name="autoResetLatestPosMode" value="${canal.auto.reset.latest.pos.mode:false}" />
 	</bean>
 
     <bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">

+ 2 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -166,6 +166,8 @@
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
+
+		<property name="autoResetLatestPosMode" value="${canal.auto.reset.latest.pos.mode:false}" />
 	</bean>
 
 	<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">

+ 7 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -49,6 +49,8 @@ public class MysqlConnector {
     // mysql connectinnId
     private long                connectionId      = -1;
     private AtomicBoolean       connected         = new AtomicBoolean(false);
+    // serverVersion
+    private String              serverVersion;
 
     public static final int     timeout           = 5 * 1000;                                     // 5s
 
@@ -179,6 +181,7 @@ public class MysqlConnector {
         }
 
         connectionId = handshakePacket.threadId; // 记录一下connection
+        serverVersion = handshakePacket.serverVersion; // 记录serverVersion
         logger.info("handshake initialization packet received, prepare the client authentication packet to send");
         ClientAuthenticationPacket clientAuth = new ClientAuthenticationPacket();
         clientAuth.setCharsetNumber(charsetNumber);
@@ -412,4 +415,8 @@ public class MysqlConnector {
         return password;
     }
 
+    public String getServerVersion() {
+        return serverVersion;
+    }
+
 }

+ 62 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/MariaGTIDSet.java

@@ -0,0 +1,62 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 类 MariaGTIDSet.java 的实现
+ *
+ * @author winger 2020/9/24 10:31 上午
+ * @version 1.0.0
+ */
+public class MariaGTIDSet implements GTIDSet {
+    //MariaDB 10.0.2+ representation of Gtid
+    Map<Long, MariaGtid> gtidMap = new HashMap<>();
+
+    @Override
+    public byte[] encode() throws IOException {
+        return this.toString().getBytes();
+    }
+
+    @Override
+    public void update(String str) {
+        MariaGtid mariaGtid = MariaGtid.parse(str);
+        gtidMap.put(mariaGtid.getDomainId(), mariaGtid);
+    }
+
+    public void add(MariaGtid mariaGtid) {
+        gtidMap.put(mariaGtid.getDomainId(), mariaGtid);
+    }
+
+    public static MariaGTIDSet parse(String gtidData) {
+        Map<Long, MariaGtid> gtidMap = new HashMap<>();
+        if (StringUtils.isNotEmpty(gtidData)) {
+            // 存在多个GTID时会有回车符
+            String[] gtidStrs = gtidData.replaceAll("\n", "").split(",");
+            for (String gtid : gtidStrs) {
+                MariaGtid mariaGtid = MariaGtid.parse(gtid);
+                gtidMap.put(mariaGtid.getDomainId(), mariaGtid);
+            }
+        }
+        MariaGTIDSet gtidSet = new MariaGTIDSet();
+        gtidSet.gtidMap = gtidMap;
+        return gtidSet;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        for (MariaGtid gtid : gtidMap.values()) {
+            if (sb.length() > 0) {
+                sb.append(",");
+            }
+            sb.append(gtid.toString());
+        }
+        return sb.toString();
+    }
+}
+
+

+ 77 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/MariaGtid.java

@@ -0,0 +1,77 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets;
+
+import org.apache.commons.lang.math.NumberUtils;
+
+/**
+ * 类 MariaGtid.java 的实现
+ *
+ * @author winger 2020/9/24 11:30 上午
+ * @version 1.0.0
+ */
+public class MariaGtid {
+
+    // {domainId}-{serverId}-{sequence}
+    private long domainId;
+    private long serverId;
+    private long sequence;
+
+    public MariaGtid(long domainId, long serverId, long sequence) {
+        this.domainId = domainId;
+        this.serverId = serverId;
+        this.sequence = sequence;
+    }
+
+    public MariaGtid(String gtid) {
+        String[] gtidArr = gtid.split("-");
+        this.domainId = NumberUtils.toLong(gtidArr[0]);
+        this.serverId = NumberUtils.toLong(gtidArr[1]);
+        this.sequence = NumberUtils.toLong(gtidArr[2]);
+    }
+
+    public static MariaGtid parse(String gtid) {
+        return new MariaGtid(gtid);
+    }
+
+    public long getDomainId() {
+        return domainId;
+    }
+
+    public void setDomainId(long domainId) {
+        this.domainId = domainId;
+    }
+
+    public long getServerId() {
+        return serverId;
+    }
+
+    public void setServerId(long serverId) {
+        this.serverId = serverId;
+    }
+
+    public long getSequence() {
+        return sequence;
+    }
+
+    public void setSequence(long sequence) {
+        this.sequence = sequence;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MariaGtid mariaGtid = (MariaGtid) o;
+        return domainId == mariaGtid.domainId &&
+                serverId == mariaGtid.serverId &&
+                sequence == mariaGtid.sequence;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s-%s-%s", domainId, serverId, sequence);
+    }
+}

+ 21 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/GtidUtil.java

@@ -0,0 +1,21 @@
+package com.alibaba.otter.canal.parse.driver.mysql.utils;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MariaGTIDSet;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
+
+/**
+ * 类 GtidUtil.java 的实现
+ *
+ * @author winger 2020/9/24 1:25 下午
+ * @version 1.0.0
+ */
+public class GtidUtil {
+
+    public static GTIDSet parseGtidSet(String gtid, boolean isMariaDB) {
+        if (isMariaDB) {
+            return MariaGTIDSet.parse(gtid);
+        }
+        return MysqlGTIDSet.parse(gtid);
+    }
+}

+ 10 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -10,6 +10,7 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.math.RandomUtils;
@@ -22,7 +23,6 @@ import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler;
 import com.alibaba.otter.canal.filter.CanalEventFilter;
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
 import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;
@@ -39,6 +39,8 @@ import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.alibaba.otter.canal.sink.CanalEventSink;
 import com.alibaba.otter.canal.sink.exception.CanalSinkException;
 
+import static com.alibaba.otter.canal.parse.driver.mysql.utils.GtidUtil.parseGtidSet;
+
 /**
  * 抽象的EventParser, 最大化共用mysql/oracle版本的实现
  * 
@@ -160,6 +162,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
             public void run() {
                 MDC.put("destination", String.valueOf(destination));
                 ErosaConnection erosaConnection = null;
+                boolean isMariaDB = false;
                 while (running) {
                     try {
                         // 开始执行replication
@@ -178,6 +181,10 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         if (queryServerId != 0) {
                             serverId = queryServerId;
                         }
+
+                        if (erosaConnection instanceof MysqlConnection) {
+                            isMariaDB = ((MysqlConnection)erosaConnection).isMariaDB();
+                        }
                         // 4. 获取最后的位置信息
                         long start = System.currentTimeMillis();
                         logger.warn("---> begin to find start position, it will be long time for reset or first position");
@@ -242,7 +249,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                             multiStageCoprocessor = buildMultiStageCoprocessor();
                             if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) {
                                 // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
-                                GTIDSet gtidSet = MysqlGTIDSet.parse(startPosition.getGtid());
+                                GTIDSet gtidSet = parseGtidSet(startPosition.getGtid(),isMariaDB);
                                 ((MysqlMultiStageCoprocessor) multiStageCoprocessor).setGtidSet(gtidSet);
                                 multiStageCoprocessor.start();
                                 erosaConnection.dump(gtidSet, multiStageCoprocessor);
@@ -260,7 +267,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         } else {
                             if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) {
                                 // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
-                                erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
+                                erosaConnection.dump(parseGtidSet(startPosition.getGtid(), isMariaDB), sinkHandler);
                             } else {
                                 if (StringUtils.isEmpty(startPosition.getJournalName())
                                     && startPosition.getTimestamp() != null) {
@@ -715,6 +722,4 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
 	public Map<String, List<String>> getFieldBlackFilterMap() {
 		return fieldBlackFilterMap;
 	}
-	
-	
 }

+ 30 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.parse.inbound.mysql;
 
+import static com.alibaba.otter.canal.parse.driver.mysql.utils.GtidUtil.parseGtidSet;
 import static com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.MASTER_HEARTBEAT_PERIOD_SECONDS;
 
 import java.io.IOException;
@@ -137,8 +138,14 @@ public class MysqlConnection implements ErosaConnection {
         // 'errno = 1236, sqlstate = HY000 errmsg = The slave is connecting
         // using CHANGE MASTER TO MASTER_AUTO_POSITION = 1 ...
         if (StringUtils.isNotEmpty(gtid)) {
-            decoder.handle(LogEvent.GTID_LOG_EVENT);
-            context.setGtidSet(MysqlGTIDSet.parse(gtid));
+            GTIDSet gtidSet = parseGtidSet(gtid, isMariaDB());
+            if (isMariaDB()) {
+                decoder.handle(LogEvent.GTID_EVENT);
+                decoder.handle(LogEvent.GTID_LIST_EVENT);
+            } else {
+                decoder.handle(LogEvent.GTID_LOG_EVENT);
+            }
+            context.setGtidSet(gtidSet);
         }
         context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
@@ -332,6 +339,14 @@ public class MysqlConnection implements ErosaConnection {
     }
 
     private void sendBinlogDumpGTID(GTIDSet gtidSet) throws IOException {
+        if (isMariaDB()) {
+            sendMariaBinlogDumpGTID(gtidSet);
+            return;
+        }
+        sendMySQLBinlogDumpGTID(gtidSet);
+    }
+
+    private void sendMySQLBinlogDumpGTID(GTIDSet gtidSet) throws IOException {
         BinlogDumpGTIDCommandPacket binlogDumpCmd = new BinlogDumpGTIDCommandPacket();
         binlogDumpCmd.slaveServerId = this.slaveId;
         binlogDumpCmd.gtidSet = gtidSet;
@@ -345,6 +360,15 @@ public class MysqlConnection implements ErosaConnection {
         connector.setDumping(true);
     }
 
+    private void sendMariaBinlogDumpGTID(GTIDSet gtidSet) throws IOException {
+        update("SET @slave_connect_state = '" + new String(gtidSet.encode()) + "'");
+        update("SET @slave_gtid_strict_mode = 0");
+        update("SET @slave_gtid_ignore_duplicates = 0");
+        sendRegisterSlave();
+        sendBinlogDump("", 0L);
+        connector.setDumping(true);
+    }
+
     public MysqlConnection fork() {
         MysqlConnection connection = new MysqlConnection();
         connection.setCharset(getCharset());
@@ -673,4 +697,8 @@ public class MysqlConnection implements ErosaConnection {
         this.receivedBinlogBytes = receivedBinlogBytes;
     }
 
+    public boolean isMariaDB() {
+        return connector.getServerVersion() != null && connector.getServerVersion().toLowerCase().contains("mariadb");
+    }
+
 }

+ 23 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -69,6 +69,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private int                  dumpErrorCount                    = 0;        // binlogDump失败异常计数
     private int                  dumpErrorCountThreshold           = 2;        // binlogDump失败异常计数阀值
     private boolean              rdsOssMode                        = false;
+    private boolean              autoResetLatestPosMode            = false;    // true: binlog被删除之后,自动按最新的数据订阅
 
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
@@ -484,7 +485,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         dumpErrorCount = 0;
                         return findPosition;
                     }
-
+                    // 处理 binlog 位点被删除的情况,提供自动重置到当前位点的功能
+                    // 应用场景: 测试环境不稳定,位点经常被删。强烈不建议在正式环境中开启此控制参数,因为binlog 丢失调到最新位点也即意味着数据丢失
+                    if (isAutoResetLatestPosMode()) {
+                        dumpErrorCount = 0;
+                        return findEndPosition(mysqlConnection);
+                    }
                     Long timestamp = logPosition.getPostion().getTimestamp();
                     if (isRdsOssMode() && (timestamp != null && timestamp > 0)) {
                         // 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理
@@ -663,6 +669,14 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
             if (isGTIDMode() && fields.size() > 4) {
                 endPosition.setGtid(fields.get(4));
             }
+            // MariaDB 无法通过`show master status`获取 gtid
+            if (mysqlConnection.isMariaDB() && isGTIDMode()) {
+                ResultSetPacket gtidPacket = mysqlConnection.query("SELECT @@global.gtid_binlog_pos");
+                List<String> gtidFields = gtidPacket.getFieldValues();
+                if (!CollectionUtils.isEmpty(gtidFields) && gtidFields.size() > 0) {
+                    endPosition.setGtid(gtidFields.get(0));
+                }
+            }
             return endPosition;
         } catch (IOException e) {
             throw new CanalParseException("command : 'show master status' has an error!", e);
@@ -938,4 +952,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     public void setDumpErrorCount(int dumpErrorCount) {
         this.dumpErrorCount = dumpErrorCount;
     }
+
+    public boolean isAutoResetLatestPosMode() {
+        return autoResetLatestPosMode;
+    }
+
+    public void setAutoResetLatestPosMode(boolean autoResetLatestPosMode) {
+        this.autoResetLatestPosMode = autoResetLatestPosMode;
+    }
 }

+ 25 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -12,6 +12,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidLogEvent;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -145,7 +147,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 return parseGTIDLogEvent((GtidLogEvent) logEvent);
             case LogEvent.HEARTBEAT_LOG_EVENT:
                 return parseHeartbeatLogEvent((HeartbeatLogEvent) logEvent);
-
+            case LogEvent.GTID_EVENT:
+            case LogEvent.GTID_LIST_EVENT:
+                return parseMariaGTIDLogEvent(logEvent);
             default:
                 break;
         }
@@ -186,6 +190,19 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         return createEntry(header, EntryType.GTIDLOG, builder.build().toByteString());
     }
 
+    private Entry parseMariaGTIDLogEvent(LogEvent logEvent) {
+        LogHeader logHeader = logEvent.getHeader();
+        Pair.Builder builder = Pair.newBuilder();
+        builder.setKey("gtid");
+        if (logEvent instanceof MariaGtidLogEvent) {
+            builder.setValue(((MariaGtidLogEvent)logEvent).getGtidStr());
+        } else if (logEvent instanceof MariaGtidListLogEvent) {
+            builder.setValue(((MariaGtidListLogEvent)logEvent).getGtidStr());
+        }
+        Header header = createHeader(logHeader, "", "", EventType.GTID);
+        return createEntry(header, EntryType.GTIDLOG, builder.build().toByteString());
+    }
+
     private Entry parseQueryEvent(QueryLogEvent event, boolean isSeek) {
         String queryString = event.getQuery();
         if (StringUtils.startsWithIgnoreCase(queryString, XA_START)) {
@@ -276,16 +293,15 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             }
 
             Header header = createHeader(event.getHeader(), schemaName, tableName, type);
-            RowChange.Builder rowChangeBuider = RowChange.newBuilder();
-            if (type == EventType.QUERY && !isDml) {
-                rowChangeBuider.setIsDdl(true);
-            }
-            rowChangeBuider.setSql(queryString);
+            RowChange.Builder rowChangeBuilder = RowChange.newBuilder();
+
+            rowChangeBuilder.setIsDdl(!isDml);
+            rowChangeBuilder.setSql(queryString);
             if (StringUtils.isNotEmpty(event.getDbName())) {// 可能为空
-                rowChangeBuider.setDdlSchemaName(event.getDbName());
+                rowChangeBuilder.setDdlSchemaName(event.getDbName());
             }
-            rowChangeBuider.setEventType(type);
-            return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
+            rowChangeBuilder.setEventType(type);
+            return createEntry(header, EntryType.ROWDATA, rowChangeBuilder.build().toByteString());
         }
     }