七锋 7 ani în urmă
părinte
comite
9c24b26d02

+ 3 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -189,6 +189,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.UPDATE_ROWS_EVENT_V1: {
@@ -196,6 +197,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.DELETE_ROWS_EVENT_V1: {
@@ -203,6 +205,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.ROTATE_EVENT: {

+ 45 - 41
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java

@@ -8,6 +8,15 @@ import java.sql.Statement;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
+
 public class DirectLogFetcherTest extends BaseLogFetcherTest {
 
     @Test
@@ -15,54 +24,49 @@ public class DirectLogFetcherTest extends BaseLogFetcherTest {
         DirectLogFetcher fecther = new DirectLogFetcher();
         try {
             Class.forName("com.mysql.jdbc.Driver");
-            Connection connection = DriverManager.getConnection("jdbc:mysql://100.81.154.142:3306", "root", "hello");
+            Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "hello");
             Statement statement = connection.createStatement();
             statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
             statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
 
-            fecther.open(connection, "mysql-bin.000006", 120L, 2);
+            fecther.open(connection, "mysql-bin.000007", 89797036L, 2);
 
-            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.UNKNOWN_EVENT);
+            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
             while (fecther.fetch()) {
-                decoder.decode(fecther, context);
-                continue;
-                // if (event == null) {
-                // continue;
-                // }
-                //
-                // int eventType = event.getHeader().getType();
-                // switch (eventType) {
-                // case LogEvent.ROTATE_EVENT:
-                // binlogFileName = ((RotateLogEvent) event).getFilename();
-                // break;
-                // case LogEvent.WRITE_ROWS_EVENT_V1:
-                // case LogEvent.WRITE_ROWS_EVENT:
-                // parseRowsEvent((WriteRowsLogEvent) event);
-                // break;
-                // case LogEvent.UPDATE_ROWS_EVENT_V1:
-                // case LogEvent.UPDATE_ROWS_EVENT:
-                // parseRowsEvent((UpdateRowsLogEvent) event);
-                // break;
-                // case LogEvent.DELETE_ROWS_EVENT_V1:
-                // case LogEvent.DELETE_ROWS_EVENT:
-                // parseRowsEvent((DeleteRowsLogEvent) event);
-                // break;
-                // case LogEvent.QUERY_EVENT:
-                // parseQueryEvent((QueryLogEvent) event);
-                // break;
-                // case LogEvent.ROWS_QUERY_LOG_EVENT:
-                // parseRowsQueryEvent((RowsQueryLogEvent) event);
-                // break;
-                // case LogEvent.ANNOTATE_ROWS_EVENT:
-                // parseAnnotateRowsEvent((AnnotateRowsEvent) event);
-                // break;
-                // case LogEvent.XID_EVENT:
-                // parseXidEvent((XidLogEvent) event);
-                // break;
-                // default:
-                // break;
-                // }
+                LogEvent event = decoder.decode(fecther, context);
+                int eventType = event.getHeader().getType();
+                switch (eventType) {
+                    case LogEvent.ROTATE_EVENT:
+                        binlogFileName = ((RotateLogEvent) event).getFilename();
+                        break;
+                    case LogEvent.WRITE_ROWS_EVENT_V1:
+                    case LogEvent.WRITE_ROWS_EVENT:
+                        parseRowsEvent((WriteRowsLogEvent) event);
+                        break;
+                    case LogEvent.UPDATE_ROWS_EVENT_V1:
+                    case LogEvent.UPDATE_ROWS_EVENT:
+                        parseRowsEvent((UpdateRowsLogEvent) event);
+                        break;
+                    case LogEvent.DELETE_ROWS_EVENT_V1:
+                    case LogEvent.DELETE_ROWS_EVENT:
+                        parseRowsEvent((DeleteRowsLogEvent) event);
+                        break;
+                    case LogEvent.QUERY_EVENT:
+                        parseQueryEvent((QueryLogEvent) event);
+                        break;
+                    case LogEvent.ROWS_QUERY_LOG_EVENT:
+                        parseRowsQueryEvent((RowsQueryLogEvent) event);
+                        break;
+                    case LogEvent.ANNOTATE_ROWS_EVENT:
+                        parseAnnotateRowsEvent((AnnotateRowsEvent) event);
+                        break;
+                    case LogEvent.XID_EVENT:
+                        parseXidEvent((XidLogEvent) event);
+                        break;
+                    default:
+                        break;
+                }
             }
         } catch (Exception e) {
             e.printStackTrace();

+ 0 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/MultiStageCoprocessor.java

@@ -25,6 +25,4 @@ public interface MultiStageCoprocessor extends CanalLifeCycle {
     public boolean publish(LogBuffer buffer);
 
     public boolean publish(LogEvent event);
-
-    public void reset();
 }

+ 15 - 25
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -37,7 +37,6 @@ import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
 
-
 /**
  * 针对解析器提供一个多阶段协同的处理
  * 
@@ -53,21 +52,21 @@ import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
  */
 public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implements MultiStageCoprocessor {
 
-    private static final int             maxFullTimes = 10;
-    private LogEventConvert              logEventConvert;
-    private EventTransactionBuffer       transactionBuffer;
-    private ErosaConnection              connection;
-
-    private int                          parserThreadCount;
-    private int                          ringBufferSize;
-    private RingBuffer<MessageEvent>     disruptorMsgBuffer;
-    private ExecutorService              parserExecutor;
-    private ExecutorService              stageExecutor;
-    private String                       destination;
-    private volatile CanalParseException exception;
-    private AtomicLong                   eventsPublishBlockingTime;
-    private GTIDSet                      gtidSet;
-    private WorkerPool<MessageEvent>     workerPool;
+    private static final int                  maxFullTimes = 10;
+    private LogEventConvert                   logEventConvert;
+    private EventTransactionBuffer            transactionBuffer;
+    private ErosaConnection                   connection;
+
+    private int                               parserThreadCount;
+    private int                               ringBufferSize;
+    private RingBuffer<MessageEvent>          disruptorMsgBuffer;
+    private ExecutorService                   parserExecutor;
+    private ExecutorService                   stageExecutor;
+    private String                            destination;
+    private volatile CanalParseException      exception;
+    private AtomicLong                        eventsPublishBlockingTime;
+    private GTIDSet                           gtidSet;
+    private WorkerPool<MessageEvent>          workerPool;
     private BatchEventProcessor<MessageEvent> simpleParserStage;
     private BatchEventProcessor<MessageEvent> sinkStoreStage;
 
@@ -235,15 +234,6 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
     }
 
-    @Override
-    public void reset() {
-        if (isStart()) {
-            stop();
-        }
-
-        start();
-    }
-
     private class SimpleParserStage implements EventHandler<MessageEvent>, LifecycleAware {
 
         private LogDecoder decoder;

+ 9 - 6
store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

@@ -7,6 +7,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.commons.lang.StringUtils;
+
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
@@ -37,14 +39,14 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
     private static final long INIT_SEQUENCE = -1;
     private int               bufferSize    = 16 * 1024;
-    private int               bufferMemUnit = 1024;                         // memsize的单位,默认为1kb大小
+    private int               bufferMemUnit = 1024;                                      // memsize的单位,默认为1kb大小
     private int               indexMask;
     private Event[]           entries;
 
     // 记录下put/get/ack操作的三个下标
-    private AtomicLong        putSequence   = new AtomicLong(INIT_SEQUENCE); // 代表当前put操作最后一次写操作发生的位置
-    private AtomicLong        getSequence   = new AtomicLong(INIT_SEQUENCE); // 代表当前get操作读取的最后一条的位置
-    private AtomicLong        ackSequence   = new AtomicLong(INIT_SEQUENCE); // 代表当前ack操作的最后一条的位置
+    private AtomicLong        putSequence   = new AtomicLong(INIT_SEQUENCE);             // 代表当前put操作最后一次写操作发生的位置
+    private AtomicLong        getSequence   = new AtomicLong(INIT_SEQUENCE);             // 代表当前get操作读取的最后一条的位置
+    private AtomicLong        ackSequence   = new AtomicLong(INIT_SEQUENCE);             // 代表当前ack操作的最后一条的位置
 
     // 记录下put/get/ack操作的三个memsize大小
     private AtomicLong        putMemSize    = new AtomicLong(0);
@@ -66,7 +68,7 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     private Condition         notFull       = lock.newCondition();
     private Condition         notEmpty      = lock.newCondition();
 
-    private BatchMode         batchMode     = BatchMode.ITEMSIZE;           // 默认为内存大小模式
+    private BatchMode         batchMode     = BatchMode.ITEMSIZE;                        // 默认为内存大小模式
     private boolean           ddlIsolation  = false;
 
     public MemoryEventStoreWithBuffer(){
@@ -335,7 +337,8 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
         for (int i = entrys.size() - 1; i >= 0; i--) {
             Event event = entrys.get(i);
-            if (CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntryType()
+            // GTID模式,ack的位点必须是事务结尾,因为下一次订阅的时候mysql会发送这个gtid之后的next,如果在事务头就记录了会丢这最后一个事务
+            if ((CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntryType() && StringUtils.isEmpty(event.getGtid()))
                 || CanalEntry.EntryType.TRANSACTIONEND == event.getEntryType() || isDdl(event.getEventType())) {
                 // 将事务头/尾设置可被为ack的点
                 range.setAck(CanalEventUtils.createPosition(event));