Просмотр исходного кода

基于明确的I/U/D是否过滤 (#3452)

* dmls filter

* dmls filters

* 基于明确的I/U/D是否过滤

* 基于明确的I/U/D是否过滤
suman-lee 4 лет назад
Родитель
Сommit
21904ef1dc

+ 3 - 0
deployer/src/main/resources/canal.properties

@@ -60,6 +60,9 @@ canal.instance.filter.query.ddl = false
 canal.instance.filter.table.error = false
 canal.instance.filter.rows = false
 canal.instance.filter.transaction.entry = false
+canal.instance.filter.dml.insert=false
+canal.instance.filter.dml.update=false
+canal.instance.filter.dml.delete=false
 
 # binlog format/image check
 canal.instance.binlog.format = ROW,STATEMENT,MIXED 

+ 3 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -174,6 +174,9 @@
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
 		<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
+		<property name="filterDmlInsert" value="${canal.instance.filter.dml.insert:false}" />
+		<property name="filterDmlUpdate" value="${canal.instance.filter.dml.update:false}" />
+		<property name="filterDmlDelete" value="${canal.instance.filter.dml.delete:false}" />
 		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />

+ 3 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -160,6 +160,9 @@
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
 		<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
+		<property name="filterDmlInsert" value="${canal.instance.filter.dml.insert:false}" />
+		<property name="filterDmlUpdate" value="${canal.instance.filter.dml.update:false}" />
+		<property name="filterDmlDelete" value="${canal.instance.filter.dml.delete:false}" />
 		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />

+ 6 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -157,6 +157,9 @@
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
 		<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
+		<property name="filterDmlInsert" value="${canal.instance.filter.dml.insert:false}" />
+		<property name="filterDmlUpdate" value="${canal.instance.filter.dml.update:false}" />
+		<property name="filterDmlDelete" value="${canal.instance.filter.dml.delete:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
@@ -261,6 +264,9 @@
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
 		<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
+		<property name="filterDmlInsert" value="${canal.instance.filter.dml.insert:false}" />
+		<property name="filterDmlUpdate" value="${canal.instance.filter.dml.update:false}" />
+		<property name="filterDmlDelete" value="${canal.instance.filter.dml.delete:false}" />
 		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />

+ 3 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -148,6 +148,9 @@
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
 		<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
+		<property name="filterDmlInsert" value="${canal.instance.filter.dml.insert:false}" />
+		<property name="filterDmlUpdate" value="${canal.instance.filter.dml.update:false}" />
+		<property name="filterDmlDelete" value="${canal.instance.filter.dml.delete:false}" />
 		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />

+ 30 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -16,6 +16,7 @@ import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DefaultTableMetaTSDBFact
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBFactory;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import org.apache.commons.lang.StringUtils;
 
 public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
@@ -37,6 +38,10 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     protected boolean              filterRows                = false;
     protected boolean              filterTableError          = false;
     protected boolean              useDruidDdlFilter         = true;
+
+    protected boolean              filterDmlInsert           = false;
+    protected boolean              filterDmlUpdate           = false;
+    protected boolean              filterDmlDelete           = false;
     // instance received binlog bytes
     protected final AtomicLong     receivedBinlogBytes       = new AtomicLong(0L);
     private final AtomicLong       eventsPublishBlockingTime = new AtomicLong(0L);
@@ -173,7 +178,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
             parallelThreadSize,
             (LogEventConvert) binlogParser,
             transactionBuffer,
-            destination);
+            destination, filterDmlInsert, filterDmlUpdate, filterDmlDelete);
         mysqlMultiStageCoprocessor.setEventsPublishBlockingTime(eventsPublishBlockingTime);
         return mysqlMultiStageCoprocessor;
     }
@@ -224,6 +229,30 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         this.useDruidDdlFilter = useDruidDdlFilter;
     }
 
+    public boolean isFilterDmlInsert() {
+        return filterDmlInsert;
+    }
+
+    public void setFilterDmlInsert(boolean filterDmlInsert) {
+        this.filterDmlInsert = filterDmlInsert;
+    }
+
+    public boolean isFilterDmlUpdate() {
+        return filterDmlUpdate;
+    }
+
+    public void setFilterDmlUpdate(boolean filterDmlUpdate) {
+        this.filterDmlUpdate = filterDmlUpdate;
+    }
+
+    public boolean isFilterDmlDelete() {
+        return filterDmlDelete;
+    }
+
+    public void setFilterDmlDelete(boolean filterDmlDelete) {
+        this.filterDmlDelete = filterDmlDelete;
+    }
+
     public void setEnableTsdb(boolean enableTsdb) {
         this.enableTsdb = enableTsdb;
         if (this.enableTsdb) {

+ 11 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -72,13 +72,20 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     private BatchEventProcessor<MessageEvent> sinkStoreStage;
     private LogContext                        logContext;
 
+    protected boolean              filterDmlInsert           = false;
+    protected boolean              filterDmlUpdate           = false;
+    protected boolean              filterDmlDelete           = false;
+
     public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
-                                      EventTransactionBuffer transactionBuffer, String destination){
+                                      EventTransactionBuffer transactionBuffer, String destination, boolean filterDmlInsert, boolean filterDmlUpdate, boolean filterDmlDelete){
         this.ringBufferSize = ringBufferSize;
         this.parserThreadCount = parserThreadCount;
         this.logEventConvert = logEventConvert;
         this.transactionBuffer = transactionBuffer;
         this.destination = destination;
+        this.filterDmlInsert = filterDmlInsert;
+        this.filterDmlUpdate = filterDmlUpdate;
+        this.filterDmlDelete = filterDmlDelete;
     }
 
     @Override
@@ -272,18 +279,18 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
                     case LogEvent.WRITE_ROWS_EVENT_V1:
                     case LogEvent.WRITE_ROWS_EVENT:
                         tableMeta = logEventConvert.parseRowsEventForTableMeta((WriteRowsLogEvent) logEvent);
-                        needDmlParse = true;
+                        needDmlParse = !filterDmlInsert;//true;
                         break;
                     case LogEvent.UPDATE_ROWS_EVENT_V1:
                     case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
                     case LogEvent.UPDATE_ROWS_EVENT:
                         tableMeta = logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) logEvent);
-                        needDmlParse = true;
+                        needDmlParse = !filterDmlUpdate;//true;
                         break;
                     case LogEvent.DELETE_ROWS_EVENT_V1:
                     case LogEvent.DELETE_ROWS_EVENT:
                         tableMeta = logEventConvert.parseRowsEventForTableMeta((DeleteRowsLogEvent) logEvent);
-                        needDmlParse = true;
+                        needDmlParse = !filterDmlDelete;//true;
                         break;
                     case LogEvent.ROWS_QUERY_LOG_EVENT:
                         needDmlParse = true;