Browse Source

Merge pull request #145 from OpenZYK/master

添加rowdata过滤
agapple 9 years ago
parent
commit
fbd99acac2

+ 2 - 1
deployer/src/main/resources/canal.properties

@@ -40,6 +40,7 @@ canal.instance.filter.query.dcl = false
 canal.instance.filter.query.dml = false
 canal.instance.filter.query.ddl = false
 canal.instance.filter.table.error = false
+canal.instance.filter.rows = false
 
 # binlog format/image check
 canal.instance.binlog.format = ROW,STATEMENT,MIXED 
@@ -63,4 +64,4 @@ canal.instance.global.lazy = false
 #canal.instance.global.manager.address = 127.0.0.1:1099
 #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
 canal.instance.global.spring.xml = classpath:spring/file-instance.xml
-#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
+#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

+ 2 - 1
deployer/src/main/resources/spring/default-instance.xml

@@ -178,8 +178,9 @@
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
+		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
 	</bean>
-</beans>
+</beans>

+ 2 - 1
deployer/src/main/resources/spring/file-instance.xml

@@ -163,8 +163,9 @@
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
+		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
 	</bean>
-</beans>
+</beans>

+ 2 - 1
deployer/src/main/resources/spring/group-instance.xml

@@ -250,8 +250,9 @@
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
+		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
 	</bean>
-</beans>
+</beans>

+ 2 - 1
deployer/src/main/resources/spring/memory-instance.xml

@@ -151,8 +151,9 @@
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
+		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
 	</bean>
-</beans>
+</beans>

+ 6 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -22,6 +22,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     protected boolean           filterQueryDcl          = false;
     protected boolean           filterQueryDml          = false;
     protected boolean           filterQueryDdl          = false;
+    protected boolean           filterRows              = false;
     protected boolean           filterTableError        = false;
 
     protected BinlogParser buildParser() {
@@ -38,6 +39,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         convert.setFilterQueryDcl(filterQueryDcl);
         convert.setFilterQueryDml(filterQueryDml);
         convert.setFilterQueryDdl(filterQueryDdl);
+        convert.setFilterRows(filterRows);
         convert.setFilterTableError(filterTableError);
         return convert;
     }
@@ -86,6 +88,10 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     public void setFilterQueryDdl(boolean filterQueryDdl) {
         this.filterQueryDdl = filterQueryDdl;
     }
+    
+    public void setFilterRows(boolean filterRows) {
+        this.filterRows = filterRows;
+    }
 
     public void setFilterTableError(boolean filterTableError) {
         this.filterTableError = filterTableError;

+ 9 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -84,6 +84,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private boolean                     filterQueryDdl      = false;
     // 是否跳过table相关的解析异常,比如表不存在或者列数量不匹配,issue 92
     private boolean                     filterTableError    = false;
+    // 新增rows过滤,用于仅订阅除rows以外的数据
+    private boolean                     filterRows      = false;
 
     public Entry parse(LogEvent logEvent) throws CanalParseException {
         if (logEvent == null || logEvent instanceof UnknownLogEvent) {
@@ -305,6 +307,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     }
 
     private Entry parseRowsEvent(RowsLogEvent event) {
+        if (filterRows) {
+            return null;
+        }
         try {
             TableMapLogEvent table = event.getTable();
             if (table == null) {
@@ -713,5 +718,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     public void setFilterTableError(boolean filterTableError) {
         this.filterTableError = filterTableError;
     }
+    
+    public void setFilterRows(boolean filterRows) {
+        this.filterRows = filterRows;
+    }
 
 }