Browse Source

fixed semi sync format & refactor

agapple 7 years ago
parent
commit
9595e6abbd

+ 1 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogBuffer.java

@@ -20,7 +20,7 @@ public class LogBuffer {
 
     protected int    origin, limit;
     protected int    position;
-    protected int 	 semival;
+    protected int    semival;
 
     protected LogBuffer(){
     }

+ 6 - 4
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -104,12 +104,14 @@ public final class LogDecoder {
                     try {
                         /* Decoding binary-log to event */
                         event = decode(buffer, header, context);
-                        if(event!=null){
-                        	event.setSemival(buffer.semival);
+                        if (event != null) {
+                            event.setSemival(buffer.semival);
                         }
                     } catch (IOException e) {
-                        if (logger.isWarnEnabled()) logger.warn("Decoding " + LogEvent.getTypeName(header.getType())
-                                                                + " failed from: " + context.getLogPosition(), e);
+                        if (logger.isWarnEnabled()) {
+                            logger.warn("Decoding " + LogEvent.getTypeName(header.getType()) + " failed from: "
+                                        + context.getLogPosition(), e);
+                        }
                         throw e;
                     } finally {
                         buffer.limit(limit); /* Restore limit */

+ 7 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogEvent.java

@@ -361,9 +361,14 @@ public abstract class LogEvent {
     protected final LogHeader  header;
     
     /**
-     * mysql半同步semi标识 0不需要semi ack 给mysql, 1需要semi ack给mysql
+     * mysql半同步semi标识
+     * 
+     * <pre>
+     * 0不需要semi ack 给mysql
+     * 1需要semi ack给mysql
+     * </pre>
      */
-    protected int semival;
+    protected int              semival;
     
     
 

+ 18 - 23
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/MariaGtidLogEvent.java

@@ -13,35 +13,30 @@ import com.taobao.tddl.dbsync.binlog.event.LogHeader;
  */
 public class MariaGtidLogEvent extends IgnorableLogEvent {
 
-	private long gtid;
-	
-	/**
-	 * mariadb gtidlog event format
-	 * 
-    uint<8> GTID sequence
-    uint<4> Replication Domain ID
-    uint<1> Flags
+    private long gtid;
 
-	if flag & FL_GROUP_COMMIT_ID
-	
-	    uint<8> commit_id
-	
-	else
-	
-	    uint<6> 0
+    /**
+     * <pre>
+     * mariadb gtidlog event format
+     *     uint<8> GTID sequence
+     *     uint<4> Replication Domain ID
+     *     uint<1> Flags
+     * 
+     * 	if flag & FL_GROUP_COMMIT_ID
+     * 	    uint<8> commit_id
+     * 	else
+     * 	    uint<6> 0
+     * </pre>
+     */
 
-	 * 
-	 */
-	
     public MariaGtidLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
         super(header, buffer, descriptionEvent);
-        gtid=buffer.getUlong64().longValue();
+        gtid = buffer.getUlong64().longValue();
         // do nothing , just ignore log event
     }
 
-	public long getGtid() {
-		return gtid;
-	}
+    public long getGtid() {
+        return gtid;
+    }
 
-    
 }

+ 27 - 30
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/SemiAckCommandPacket.java

@@ -10,26 +10,23 @@ import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
 
 /**
  * semi ack command
+ * 
  * @author amos_chen
- *
  */
 public class SemiAckCommandPacket extends CommandPacket {
 
-	public long binlogPosition;
-	
-	public String binlogFileName;
-	
-	public SemiAckCommandPacket(){
-		
-	}
-	
-	@Override
-	public void fromBytes(byte[] data) throws IOException {
-		// TODO Auto-generated method stub
-
-	}
-
-	/**
+    public long   binlogPosition;
+    public String binlogFileName;
+
+    public SemiAckCommandPacket(){
+
+    }
+
+    @Override
+    public void fromBytes(byte[] data) throws IOException {
+    }
+
+    /**
      * <pre>
      * Bytes                        Name
      *  --------------------------------------------------------
@@ -37,22 +34,22 @@ public class SemiAckCommandPacket extends CommandPacket {
      *  -----                        ----
      *  1                            semi mark
      *  8                            binlog position to start at (little endian)
-     *  n                            binlog file name 
+     *  n                            binlog file name
      * 
      * </pre>
      */
-	public byte[] toBytes() throws IOException {
-		ByteArrayOutputStream out = new ByteArrayOutputStream();
-		//0 write semi mark
-		out.write(0xef);
-		//1 write 8 bytes for position
-		ByteHelper.write8ByteUnsignedIntLittleEndian(binlogPosition, out);
-		
-		//2 write binlog filename
-		if(StringUtils.isNotEmpty(binlogFileName)){
-			out.write(binlogFileName.getBytes());
-		}
-		return out.toByteArray();
-	}
+    public byte[] toBytes() throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        // 0 write semi mark
+        out.write(0xef);
+        // 1 write 8 bytes for position
+        ByteHelper.write8ByteUnsignedIntLittleEndian(binlogPosition, out);
+
+        // 2 write binlog filename
+        if (StringUtils.isNotEmpty(binlogFileName)) {
+            out.write(binlogFileName.getBytes());
+        }
+        return out.toByteArray();
+    }
 
 }

+ 7 - 15
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -25,7 +25,6 @@ import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
 
 public class MysqlConnection implements ErosaConnection {
 
@@ -37,8 +36,6 @@ public class MysqlConnection implements ErosaConnection {
     private BinlogFormat        binlogFormat;
     private BinlogImage         binlogImage;
 
-    private String binlogfilename;
-    
     // tsdb releated
     private AuthenticationInfo  authInfo;
     protected int               connTimeout = 5 * 1000;                                      // 5秒
@@ -145,18 +142,13 @@ public class MysqlConnection implements ErosaConnection {
             if (event == null) {
                 throw new CanalParseException("parse failed");
             }
-            
-            //binlog日志文件发生变化
-            if(event.getHeader().getType()==LogEvent.ROTATE_EVENT){
-            	binlogfilename=((RotateLogEvent)event).getFilename();
-            }
 
             if (!func.sink(event)) {
                 break;
-            }else{
-            	if(event.getSemival()==1){
-            		sendSemiAck(binlogfilename, binlogPosition);
-            	}
+            }
+
+            if (event.getSemival() == 1) {
+                sendSemiAck(context.getLogPosition().getFileName(), binlogPosition);
             }
         }
     }
@@ -180,12 +172,12 @@ public class MysqlConnection implements ErosaConnection {
         PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
         connector.setDumping(true);
     }
-    
+
     private void sendSemiAck(String binlogfilename, Long binlogPosition) throws IOException {
         SemiAckCommandPacket semiAckCmd = new SemiAckCommandPacket();
         semiAckCmd.binlogFileName = binlogfilename;
         semiAckCmd.binlogPosition = binlogPosition;
-        
+
         byte[] cmdBody = semiAckCmd.toBytes();
 
         logger.info("SEMI ACK with position:{}", semiAckCmd);
@@ -193,7 +185,7 @@ public class MysqlConnection implements ErosaConnection {
         semiAckHeader.setPacketBodyLength(cmdBody.length);
         semiAckHeader.setPacketSequenceNumber((byte) 0x00);
         PacketManager.writePkg(connector.getChannel(), semiAckHeader.toBytes(), cmdBody);
-        
+
     }
 
     public MysqlConnection fork() {

+ 17 - 17
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -36,8 +36,8 @@ public class DirectLogFetcher extends LogFetcher {
     public static final int       MAX_PACKET_LENGTH = (256 * 256 * 256 - 1);
 
     private SocketChannel         channel;
-    
-    private boolean issemi=false;
+
+    private boolean               issemi            = false;
 
     // private BufferedInputStream input;
 
@@ -55,9 +55,9 @@ public class DirectLogFetcher extends LogFetcher {
 
     public void start(SocketChannel channel) throws IOException {
         this.channel = channel;
-        String dbsemi= System.getProperty("db.semi");
-        if("1".equals(dbsemi)){
-        	issemi=true;
+        String dbsemi = System.getProperty("db.semi");
+        if ("1".equals(dbsemi)) {
+            issemi = true;
         }
         // 和mysql driver一样,提供buffer机制,提升读取binlog速度
         // this.input = new
@@ -111,15 +111,15 @@ public class DirectLogFetcher extends LogFetcher {
                                           + ", len = " + netlen);
                 }
             }
-            
-            //if mysql is in semi mode
-            if(issemi){
-	            //parse semi mark
-	            int semimark=getUint8(NET_HEADER_SIZE+1);
-	            int semival=getUint8(NET_HEADER_SIZE+2);
-	            this.semival=semival;
+
+            // if mysql is in semi mode
+            if (issemi) {
+                // parse semi mark
+                int semimark = getUint8(NET_HEADER_SIZE + 1);
+                int semival = getUint8(NET_HEADER_SIZE + 2);
+                this.semival = semival;
             }
-            
+
             // The first packet is a multi-packet, concatenate the packets.
             while (netlen == MAX_PACKET_LENGTH) {
                 if (!fetch0(0, NET_HEADER_SIZE)) {
@@ -136,10 +136,10 @@ public class DirectLogFetcher extends LogFetcher {
             }
 
             // Preparing buffer variables to decoding.
-            if(issemi){
-            	origin = NET_HEADER_SIZE + 3;
-            }else{
-            	origin = NET_HEADER_SIZE + 1;
+            if (issemi) {
+                origin = NET_HEADER_SIZE + 3;
+            } else {
+                origin = NET_HEADER_SIZE + 1;
             }
             position = origin;
             limit -= origin;