Browse Source

Merge pull request #455 from lucifax301/master

mysql semi support and mariadb gtid parse
agapple 7 years ago
parent
commit
c6f4de9f88

+ 1 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogBuffer.java

@@ -20,6 +20,7 @@ public class LogBuffer {
 
     protected int    origin, limit;
     protected int    position;
+    protected int 	 semival;
 
     protected LogBuffer(){
     }

+ 3 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -104,6 +104,9 @@ public final class LogDecoder {
                     try {
                         /* Decoding binary-log to event */
                         event = decode(buffer, header, context);
+                        if(event!=null){
+                        	event.setSemival(buffer.semival);
+                        }
                     } catch (IOException e) {
                         if (logger.isWarnEnabled()) logger.warn("Decoding " + LogEvent.getTypeName(header.getType())
                                                                 + " failed from: " + context.getLogPosition(), e);

+ 16 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogEvent.java

@@ -359,8 +359,23 @@ public abstract class LogEvent {
     protected static final Log logger = LogFactory.getLog(LogEvent.class);
 
     protected final LogHeader  header;
+    
+    /**
+     * mysql半同步semi标识 0不需要semi ack 给mysql, 1需要semi ack给mysql
+     */
+    protected int semival;
+    
+    
+
+    public int getSemival() {
+		return semival;
+	}
+
+	public void setSemival(int semival) {
+		this.semival = semival;
+	}
 
-    protected LogEvent(LogHeader header){
+	protected LogEvent(LogHeader header){
         this.header = header;
     }
 

+ 26 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/MariaGtidLogEvent.java

@@ -13,9 +13,35 @@ import com.taobao.tddl.dbsync.binlog.event.LogHeader;
  */
 public class MariaGtidLogEvent extends IgnorableLogEvent {
 
+	private long gtid;
+	
+	/**
+	 * mariadb gtidlog event format
+	 * 
+    uint<8> GTID sequence
+    uint<4> Replication Domain ID
+    uint<1> Flags
+
+	if flag & FL_GROUP_COMMIT_ID
+	
+	    uint<8> commit_id
+	
+	else
+	
+	    uint<6> 0
+
+	 * 
+	 */
+	
     public MariaGtidLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
         super(header, buffer, descriptionEvent);
+        gtid=buffer.getUlong64().longValue();
         // do nothing , just ignore log event
     }
 
+	public long getGtid() {
+		return gtid;
+	}
+
+    
 }

+ 58 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/SemiAckCommandPacket.java

@@ -0,0 +1,58 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets.client;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.CommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
+
+/**
+ * semi ack command
+ * @author amos_chen
+ *
+ */
+public class SemiAckCommandPacket extends CommandPacket {
+
+	public long binlogPosition;
+	
+	public String binlogFileName;
+	
+	public SemiAckCommandPacket(){
+		
+	}
+	
+	@Override
+	public void fromBytes(byte[] data) throws IOException {
+		// TODO Auto-generated method stub
+
+	}
+
+	/**
+     * <pre>
+     * Bytes                        Name
+     *  --------------------------------------------------------
+     *  Bytes                        Name
+     *  -----                        ----
+     *  1                            semi mark
+     *  8                            binlog position to start at (little endian)
+     *  n                            binlog file name 
+     * 
+     * </pre>
+     */
+	public byte[] toBytes() throws IOException {
+		ByteArrayOutputStream out = new ByteArrayOutputStream();
+		//0 write semi mark
+		out.write(0xef);
+		//1 write 8 bytes for position
+		ByteHelper.write8ByteUnsignedIntLittleEndian(binlogPosition, out);
+		
+		//2 write binlog filename
+		if(StringUtils.isNotEmpty(binlogFileName)){
+			out.write(binlogFileName.getBytes());
+		}
+		return out.toByteArray();
+	}
+
+}

+ 12 - 1
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/ByteHelper.java

@@ -108,7 +108,18 @@ public abstract class ByteHelper {
 
         return out.toByteArray();
     }
-
+    
+    public static void write8ByteUnsignedIntLittleEndian(long data, ByteArrayOutputStream out) {
+        out.write((byte) (data & 0xFF));
+        out.write((byte) (data >>> 8));
+        out.write((byte) (data >>> 16));
+        out.write((byte) (data >>> 24));
+        out.write((byte) (data >>> 32));
+        out.write((byte) (data >>> 40));
+        out.write((byte) (data >>> 48));
+        out.write((byte) (data >>> 56));
+    }
+    
     public static void writeUnsignedIntLittleEndian(long data, ByteArrayOutputStream out) {
         out.write((byte) (data & 0xFF));
         out.write((byte) (data >>> 8));

+ 28 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -14,6 +14,7 @@ import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.SemiAckCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
@@ -24,6 +25,7 @@ import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
 
 public class MysqlConnection implements ErosaConnection {
 
@@ -35,6 +37,8 @@ public class MysqlConnection implements ErosaConnection {
     private BinlogFormat        binlogFormat;
     private BinlogImage         binlogImage;
 
+    private String binlogfilename;
+    
     // tsdb releated
     private AuthenticationInfo  authInfo;
     protected int               connTimeout = 5 * 1000;                                      // 5秒
@@ -141,9 +145,18 @@ public class MysqlConnection implements ErosaConnection {
             if (event == null) {
                 throw new CanalParseException("parse failed");
             }
+            
+            //binlog日志文件发生变化
+            if(event.getHeader().getType()==LogEvent.ROTATE_EVENT){
+            	binlogfilename=((RotateLogEvent)event).getFilename();
+            }
 
             if (!func.sink(event)) {
                 break;
+            }else{
+            	if(event.getSemival()==1){
+            		sendSemiAck(binlogfilename, binlogPosition);
+            	}
             }
         }
     }
@@ -167,6 +180,21 @@ public class MysqlConnection implements ErosaConnection {
         PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
         connector.setDumping(true);
     }
+    
+    private void sendSemiAck(String binlogfilename, Long binlogPosition) throws IOException {
+        SemiAckCommandPacket semiAckCmd = new SemiAckCommandPacket();
+        semiAckCmd.binlogFileName = binlogfilename;
+        semiAckCmd.binlogPosition = binlogPosition;
+        
+        byte[] cmdBody = semiAckCmd.toBytes();
+
+        logger.info("SEMI ACK with position:{}", semiAckCmd);
+        HeaderPacket semiAckHeader = new HeaderPacket();
+        semiAckHeader.setPacketBodyLength(cmdBody.length);
+        semiAckHeader.setPacketSequenceNumber((byte) 0x00);
+        PacketManager.writePkg(connector.getChannel(), semiAckHeader.toBytes(), cmdBody);
+        
+    }
 
     public MysqlConnection fork() {
         MysqlConnection connection = new MysqlConnection();

+ 20 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -36,6 +36,8 @@ public class DirectLogFetcher extends LogFetcher {
     public static final int       MAX_PACKET_LENGTH = (256 * 256 * 256 - 1);
 
     private SocketChannel         channel;
+    
+    private boolean issemi=false;
 
     // private BufferedInputStream input;
 
@@ -53,6 +55,10 @@ public class DirectLogFetcher extends LogFetcher {
 
     public void start(SocketChannel channel) throws IOException {
         this.channel = channel;
+        String dbsemi= System.getProperty("db.semi");
+        if("1".equals(dbsemi)){
+        	issemi=true;
+        }
         // 和mysql driver一样,提供buffer机制,提升读取binlog速度
         // this.input = new
         // BufferedInputStream(channel.socket().getInputStream(), 16384);
@@ -105,7 +111,15 @@ public class DirectLogFetcher extends LogFetcher {
                                           + ", len = " + netlen);
                 }
             }
-
+            
+            //if mysql is in semi mode
+            if(issemi){
+	            //parse semi mark
+	            int semimark=getUint8(NET_HEADER_SIZE+1);
+	            int semival=getUint8(NET_HEADER_SIZE+2);
+	            this.semival=semival;
+            }
+            
             // The first packet is a multi-packet, concatenate the packets.
             while (netlen == MAX_PACKET_LENGTH) {
                 if (!fetch0(0, NET_HEADER_SIZE)) {
@@ -122,7 +136,11 @@ public class DirectLogFetcher extends LogFetcher {
             }
 
             // Preparing buffer variables to decoding.
-            origin = NET_HEADER_SIZE + 1;
+            if(issemi){
+            	origin = NET_HEADER_SIZE + 3;
+            }else{
+            	origin = NET_HEADER_SIZE + 1;
+            }
             position = origin;
             limit -= origin;
             return true;