winger 6 years ago
parent
commit
7de40091a3

+ 4 - 11
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java

@@ -66,18 +66,11 @@ public final class LogContext {
         mapOfTable.clear();
     }
 
-    public final void putGtid(GtidLogEvent logEvent) {
-        if (logEvent != null) {
-            String gtid = logEvent.getSid().toString() + ":" + logEvent.getGno();
-            if (gtidSet == null) {
-                gtid = logEvent.getSid().toString() + ":1-" + logEvent.getGno();
-                gtidSet = MysqlGTIDSet.parse(gtid);
-            }
-            gtidSet.update(gtid);
-        }
-    }
-
     public GTIDSet getGtidSet() {
         return gtidSet;
     }
+
+    public void setGtidSet(GTIDSet gtidSet) {
+        this.gtidSet = gtidSet;
+    }
 }

+ 13 - 8
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -4,6 +4,7 @@ import java.io.IOException;
 import java.util.BitSet;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -47,22 +48,22 @@ import com.taobao.tddl.dbsync.binlog.event.mariadb.StartEncryptionLogEvent;
 
 /**
  * Implements a binary-log decoder.
- * 
+ *
  * <pre>
  * LogDecoder decoder = new LogDecoder();
  * decoder.handle(...);
- * 
+ *
  * LogEvent event;
  * do
  * {
  *     event = decoder.decode(buffer, context);
- * 
+ *
  *     // process log event.
  * }
  * while (event != null);
  * // no more events in buffer.
  * </pre>
- * 
+ *
  * @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
  * @version 1.0
  */
@@ -89,7 +90,7 @@ public final class LogDecoder {
 
     /**
      * Decoding an event from binary-log buffer.
-     * 
+     *
      * @return <code>UknownLogEvent</code> if event type is unknown or skipped,
      * <code>null</code> if buffer is not including a full event.
      */
@@ -141,7 +142,7 @@ public final class LogDecoder {
 
     /**
      * Deserialize an event from buffer.
-     * 
+     *
      * @return <code>UknownLogEvent</code> if event type is unknown or skipped.
      */
     public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext context) throws IOException {
@@ -160,6 +161,7 @@ public final class LogDecoder {
             // remove checksum bytes
             buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
         }
+        GTIDSet gtidSet = context.getGtidSet();
         switch (header.getType()) {
             case LogEvent.QUERY_EVENT: {
                 QueryLogEvent event = new QueryLogEvent(header, buffer, descriptionEvent);
@@ -373,8 +375,11 @@ public final class LogDecoder {
                 GtidLogEvent event = new GtidLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                // update latest gtid
-                context.putGtid(event);
+                if (gtidSet != null) {
+                    gtidSet.update(event.getGtidStr());
+                    // update latest gtid
+                    header.putGtidStr(gtidSet);
+                }
                 return event;
             }
             case LogEvent.PREVIOUS_GTIDS_LOG_EVENT: {

+ 7 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/GtidLogEvent.java

@@ -82,4 +82,11 @@ public class GtidLogEvent extends LogEvent {
     public Long getSequenceNumber() {
         return sequenceNumber;
     }
+
+    public String getGtidStr() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(sid.toString()).append(":");
+        sb.append(gno);
+        return sb.toString();
+    }
 }

+ 7 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -8,6 +8,8 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.math.RandomUtils;
@@ -235,12 +237,14 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         if (parallel) {
                             // build stage processor
                             multiStageCoprocessor = buildMultiStageCoprocessor();
-                            multiStageCoprocessor.start();
-
                             if (isGTIDMode()) {
                                 // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
-                                erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), multiStageCoprocessor);
+                                GTIDSet gtidSet = MysqlGTIDSet.parse(startPosition.getGtid());
+                                ((MysqlMultiStageCoprocessor) multiStageCoprocessor).setGtidSet(gtidSet);
+                                multiStageCoprocessor.start();
+                                erosaConnection.dump(gtidSet, multiStageCoprocessor);
                             } else {
+                                multiStageCoprocessor.start();
                                 if (StringUtils.isEmpty(startPosition.getJournalName())
                                     && startPosition.getTimestamp() != null) {
                                     erosaConnection.dump(startPosition.getTimestamp(), multiStageCoprocessor);

+ 0 - 7
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -76,13 +76,6 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
      * @return
      */
     protected boolean processTableMeta(EntryPosition position) {
-        if (isGTIDMode()) {
-            if (binlogParser instanceof LogEventConvert) {
-                // 记录gtid
-                ((LogEventConvert) binlogParser).setGtidSet(MysqlGTIDSet.parse(position.getGtid()));
-            }
-        }
-
         if (tableMetaTSDB != null) {
             if (position.getTimestamp() == null || position.getTimestamp() <= 0) {
                 throw new CanalParseException("use gtid and TableMeta TSDB should be config timestamp > 0");

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -179,6 +179,8 @@ public class MysqlConnection implements ErosaConnection {
             fetcher.start(connector.getChannel());
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
+            // fix bug: #890 将gtid传输至context中,供decode使用
+            context.setGtidSet(gtidSet);
             while (fetcher.fetch()) {
                 accumulateReceivedBytes(fetcher.limit());
                 LogEvent event = null;

+ 8 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -7,6 +7,7 @@ import java.util.concurrent.locks.LockSupport;
 
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer;
@@ -63,6 +64,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     private String                       destination;
     private volatile CanalParseException exception;
     private AtomicLong                   eventsPublishBlockingTime;
+    private GTIDSet                      gtidSet;
 
     public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
                                       EventTransactionBuffer transactionBuffer, String destination){
@@ -227,6 +229,9 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         public SimpleParserStage(){
             decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             context = new LogContext();
+            if (gtidSet != null) {
+                context.setGtidSet(gtidSet);
+            }
         }
 
         public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception {
@@ -447,4 +452,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         this.eventsPublishBlockingTime = eventsPublishBlockingTime;
     }
 
+    public void setGtidSet(GTIDSet gtidSet) {
+        this.gtidSet = gtidSet;
+    }
 }

+ 1 - 13
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -99,13 +99,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private boolean                     filterRows          = false;
     private boolean                     useDruidDdlFilter   = true;
 
-    // latest gtid
-    private GTIDSet                     gtidSet;
-
-    public LogEventConvert(GTIDSet gtidSet){
-        this.gtidSet = gtidSet;
-    }
-
     public LogEventConvert(){
 
     }
@@ -172,10 +165,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     private Entry parseGTIDLogEvent(GtidLogEvent logEvent) {
         LogHeader logHeader = logEvent.getHeader();
-        String value = logEvent.getSid().toString() + ":" + logEvent.getGno();
         Pair.Builder builder = Pair.newBuilder();
         builder.setKey("gtid");
-        builder.setValue(value);
+        builder.setValue(logEvent.getGtidStr());
 
         if (logEvent.getLastCommitted() != null) {
             builder.setKey("lastCommitted");
@@ -974,8 +966,4 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     public void setFilterRows(boolean filterRows) {
         this.filterRows = filterRows;
     }
-
-    public void setGtidSet(GTIDSet gtidSet) {
-        this.gtidSet = gtidSet;
-    }
 }