Преглед на файлове

fixed issue #119 , support binlog row image noblob/minimal

agapple преди 9 години
родител
ревизия
3d6174f857

+ 4 - 0
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/BaseLogFetcherTest.java

@@ -92,6 +92,10 @@ public class BaseLogFetcherTest {
         final ColumnInfo[] columnInfo = map.getColumnInfo();
 
         for (int i = 0; i < columnCnt; i++) {
+            if (!cols.get(i)) {
+                continue;
+            }
+
             ColumnInfo info = columnInfo[i];
             buffer.nextValue(info.type, info.meta);
 

+ 1 - 2
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java

@@ -6,7 +6,6 @@ import java.sql.DriverManager;
 import java.sql.Statement;
 
 import org.junit.Assert;
-
 import org.junit.Test;
 
 import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
@@ -30,7 +29,7 @@ public class DirectLogFetcherTest extends BaseLogFetcherTest {
             statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
             statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
 
-            fecther.open(connection, "mysql-bin.000010", 4L, 2);
+            fecther.open(connection, "mysql-bin.000001", 4L, 2);
 
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();

+ 2 - 4
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/FileLogFetcherTest.java

@@ -5,7 +5,6 @@ import java.io.IOException;
 import java.net.URL;
 
 import org.junit.Assert;
-
 import org.junit.Before;
 import org.junit.Test;
 
@@ -27,7 +26,6 @@ public class FileLogFetcherTest extends BaseLogFetcherTest {
         URL url = Thread.currentThread().getContextClassLoader().getResource("dummy.txt");
         File dummyFile = new File(url.getFile());
         directory = new File(dummyFile.getParent() + "/binlog").getPath();
-        // directory = "/home/jianghang/tmp/binlog";
     }
 
     @Test
@@ -37,8 +35,8 @@ public class FileLogFetcherTest extends BaseLogFetcherTest {
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
 
-            File current = new File(directory, "mysql-bin.000006");
-            fetcher.open(current);
+            File current = new File(directory, "mysql-bin.000001");
+            fetcher.open(current, 2051L);
             context.setLogPosition(new LogPosition(current.getName()));
 
             while (fetcher.fetch()) {

+ 17 - 5
deployer/src/main/bin/stop.sh

@@ -1,10 +1,14 @@
 #!/bin/bash
 
 cygwin=false;
+linux=false;
 case "`uname`" in
     CYGWIN*)
         cygwin=true
         ;;
+    Linux*)
+    	linux=true
+    	;;
 esac
 
 get_pid() {	
@@ -15,11 +19,19 @@ get_pid() {
         JAVA_CMD=`cygpath --path --unix $JAVA_CMD`
         JAVA_PID=`ps |grep $JAVA_CMD |awk '{print $1}'`
     else
-        if [ ! -z "$PID" ]; then
-        	JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
-	    else 
-	        JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep -v grep|awk '{print $2}'`
-        fi
+    	if $linux; then
+	        if [ ! -z "$PID" ]; then
+	        	JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
+		    else 
+		        JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep -v grep|awk '{print $2}'`
+	        fi
+	    else
+	    	if [ ! -z "$PID" ]; then
+	        	JAVA_PID=`ps aux |grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
+		    else 
+		        JAVA_PID=`ps aux |grep "$STR"|grep -v grep|awk '{print $2}'`
+	        fi
+	    fi
     fi
     echo $JAVA_PID;
 }

+ 4 - 0
deployer/src/main/resources/canal.properties

@@ -41,6 +41,10 @@ canal.instance.filter.query.dml = false
 canal.instance.filter.query.ddl = false
 canal.instance.filter.table.error = false
 
+# binlog format/image check
+canal.instance.binlog.format = ROW,STATEMENT,MIXED 
+canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
+
 # binlog ddl isolation
 canal.instance.get.ddl.isolation = false
 

+ 2 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -179,5 +179,7 @@
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
+		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
+		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
 	</bean>
 </beans>

+ 2 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -164,5 +164,7 @@
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
+		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
+		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
 	</bean>
 </beans>

+ 4 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -161,6 +161,8 @@
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
+		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
+		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
 	</bean>
 	
 	<bean id="eventParser2" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
@@ -249,5 +251,7 @@
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
+		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
+		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
 	</bean>
 </beans>

+ 2 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -152,5 +152,7 @@
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
+		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
+		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
 	</bean>
 </beans>

+ 79 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -27,12 +27,13 @@ import com.taobao.tddl.dbsync.binlog.LogEvent;
 
 public class MysqlConnection implements ErosaConnection {
 
-    private static final Logger logger       = LoggerFactory.getLogger(MysqlConnection.class);
+    private static final Logger logger  = LoggerFactory.getLogger(MysqlConnection.class);
 
     private MysqlConnector      connector;
     private long                slaveId;
-    private Charset             charset      = Charset.forName("UTF-8");
-    private BinlogFormat        binlogFormat = BinlogFormat.ROW;
+    private Charset             charset = Charset.forName("UTF-8");
+    private BinlogFormat        binlogFormat;
+    private BinlogImage         binlogImage;
 
     public MysqlConnection(){
     }
@@ -104,11 +105,6 @@ public class MysqlConnection implements ErosaConnection {
 
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         updateSettings();
-        BinlogFormat format = getBinlogFormat();
-        if (!format.isRow()) {
-            logger.warn("binlog_format : {} , it will not have rowData before/after columns", format.toString());
-        }
-
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
@@ -212,7 +208,7 @@ public class MysqlConnection implements ErosaConnection {
     }
 
     /**
-     * 判断一下是否采用ROW模
+     * 获取一下binlog format格
      */
     private void loadBinlogFormat() {
         ResultSetPacket rs = null;
@@ -234,6 +230,30 @@ public class MysqlConnection implements ErosaConnection {
         }
     }
 
+    /**
+     * 获取一下binlog image格式
+     */
+    private void loadBinlogImage() {
+        ResultSetPacket rs = null;
+        try {
+            rs = query("show variables like 'binlog_row_image'");
+        } catch (IOException e) {
+            throw new CanalParseException(e);
+        }
+
+        List<String> columnValues = rs.getFieldValues();
+        if (columnValues == null || columnValues.size() != 2) {
+            // 可能历时版本没有image特性
+            binlogImage = BinlogImage.FULL;
+        } else {
+            binlogImage = BinlogImage.valuesOf(columnValues.get(1));
+        }
+
+        if (binlogFormat == null) {
+            throw new IllegalStateException("unexpected binlog image query result:" + rs.getFieldValues());
+        }
+    }
+
     public static enum BinlogFormat {
 
         STATEMENT("STATEMENT"), ROW("ROW"), MIXED("MIXED");
@@ -267,6 +287,46 @@ public class MysqlConnection implements ErosaConnection {
         }
     }
 
+    /**
+     * http://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.
+     * html#sysvar_binlog_row_image
+     * 
+     * @author agapple 2015年6月29日 下午10:39:03
+     * @since 1.0.20
+     */
+    public static enum BinlogImage {
+
+        FULL("FULL"), MINIMAL("MINIMAL"), NOBLOB("NOBLOB");
+
+        public boolean isFull() {
+            return this == FULL;
+        }
+
+        public boolean isMinimal() {
+            return this == MINIMAL;
+        }
+
+        public boolean isNoBlob() {
+            return this == NOBLOB;
+        }
+
+        private String value;
+
+        private BinlogImage(String value){
+            this.value = value;
+        }
+
+        public static BinlogImage valuesOf(String value) {
+            BinlogImage[] formats = values();
+            for (BinlogImage format : formats) {
+                if (format.value.equalsIgnoreCase(value)) {
+                    return format;
+                }
+            }
+            return null;
+        }
+    }
+
     // ================== setter / getter ===================
 
     public Charset getCharset() {
@@ -303,4 +363,14 @@ public class MysqlConnection implements ErosaConnection {
         return binlogFormat;
     }
 
+    public BinlogImage getBinlogImage() {
+        if (binlogImage == null) {
+            synchronized (this) {
+                loadBinlogImage();
+            }
+        }
+
+        return binlogImage;
+    }
+
 }

+ 58 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -22,6 +22,8 @@ import com.alibaba.otter.canal.parse.ha.CanalHAController;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.HeartBeatCallback;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.BinlogFormat;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.BinlogImage;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
@@ -62,6 +64,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private TableMetaCache     tableMetaCache;                               // 对应meta
                                                                               // cache
     private int                fallbackIntervalInSeconds         = 60;       // 切换回退时间
+    private BinlogFormat[]     supportBinlogFormats;                         // 支持的binlogFormat,如果设置会执行强校验
+    private BinlogImage[]      supportBinlogImages;                          // 支持的binlogImage,如果设置会执行强校验
 
     // 心跳检查
 
@@ -82,6 +86,34 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 throw new CanalParseException(e);
             }
 
+            if (supportBinlogFormats != null && supportBinlogFormats.length > 0) {
+                BinlogFormat format = ((MysqlConnection) metaConnection).getBinlogFormat();
+                boolean found = false;
+                for (BinlogFormat supportFormat : supportBinlogFormats) {
+                    if (supportFormat != null && format == supportFormat) {
+                        found = true;
+                    }
+                }
+
+                if (!found) {
+                    throw new CanalParseException("Unsupported BinlogFormat " + format);
+                }
+            }
+
+            if (supportBinlogImages != null && supportBinlogImages.length > 0) {
+                BinlogImage image = ((MysqlConnection) metaConnection).getBinlogImage();
+                boolean found = false;
+                for (BinlogImage supportImage : supportBinlogImages) {
+                    if (supportImage != null && image == supportImage) {
+                        found = true;
+                    }
+                }
+
+                if (!found) {
+                    throw new CanalParseException("Unsupported BinlogImage " + image);
+                }
+            }
+
             tableMetaCache = new TableMetaCache(metaConnection);
             ((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
         }
@@ -685,6 +717,32 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         return event;
     }
 
+    public void setSupportBinlogFormats(String formatStrs) {
+        String[] formats = StringUtils.split(formatStrs, ',');
+        if (formats != null) {
+            BinlogFormat[] supportBinlogFormats = new BinlogFormat[formats.length];
+            int i = 0;
+            for (String format : formats) {
+                supportBinlogFormats[i++] = BinlogFormat.valuesOf(format);
+            }
+
+            this.supportBinlogFormats = supportBinlogFormats;
+        }
+    }
+
+    public void setSupportBinlogImages(String imageStrs) {
+        String[] images = StringUtils.split(imageStrs, ',');
+        if (images != null) {
+            BinlogImage[] supportBinlogImages = new BinlogImage[images.length];
+            int i = 0;
+            for (String image : images) {
+                supportBinlogImages[i++] = BinlogImage.valuesOf(image);
+            }
+
+            this.supportBinlogImages = supportBinlogImages;
+        }
+    }
+
     // ===================== setter / getter ========================
 
     public void setDefaultConnectionTimeoutInSeconds(int defaultConnectionTimeoutInSeconds) {

+ 16 - 15
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -422,6 +422,11 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
         for (int i = 0; i < columnCnt; i++) {
             ColumnInfo info = columnInfo[i];
+            // mysql 5.6开始支持nolob/mininal类型,并不一定记录所有的列,需要进行判断
+            if (!cols.get(i)) {
+                continue;
+            }
+
             Column.Builder columnBuilder = Column.newBuilder();
 
             FieldMeta fieldMeta = null;
@@ -604,25 +609,21 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         if (index < 0) {
             return false;
         }
-        if ((bfColumns.size() - 1) < index) {
-            return false;
-        }
-        Column column = bfColumns.get(index);
 
-        if (column.getIsNull()) {
-            if (newValue != null) {
-                return true;
-            }
-        } else {
-            if (newValue == null) {
-                return true;
-            } else {
-                if (!column.getValue().equals(newValue)) {
-                    return true;
+        for (Column column : bfColumns) {
+            if (column.getIndex() == index) {// 比较before / after的column index
+                if (column.getIsNull() && newValue == null) {
+                    // 如果全是null
+                    return false;
+                } else if (newValue != null && column.getValue().equals(newValue)) {
+                    // 如果不围null,并且相等
+                    return false;
                 }
             }
         }
-        return false;
+
+        // 比如nolob/minial模式下,可能找不到before记录,认为是有变化
+        return true;
     }
 
     private TableMeta getTableMeta(String dbName, String tbName, boolean useCache) {