Browse Source

增加表字段过滤功能 (#1914)

* 增加表字段过滤功能

* 增加表字段过滤功能

* 更换fastsql版本通过编译

* 1.修改字段配置格式 2.增加字段黑名单配置(冲突时以白名单为准)
cjk87927 5 years ago
parent
commit
c81034dda3

+ 4 - 0
deployer/src/main/resources/example/instance.properties

@@ -41,6 +41,10 @@ canal.instance.enableDruid=false
 canal.instance.filter.regex=.*\\..*
 # table black regex
 canal.instance.filter.black.regex=
+# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
+#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
+# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
+#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
 
 # mq config
 canal.mq.topic=example

+ 3 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -98,6 +98,9 @@
 				<constructor-arg index="1" value="false" />
 			</bean>
 		</property>
+		
+		<property name="fieldFilter" value="${canal.instance.filter.field}" />
+		<property name="fieldBlackFilter" value="${canal.instance.filter.black.field}" />
 
 		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
 		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />

+ 4 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -84,6 +84,10 @@
 				<constructor-arg index="1" value="false" />
 			</bean>
 		</property>
+		
+		<property name="fieldFilter" value="${canal.instance.filter.field}" />
+		<property name="fieldBlackFilter" value="${canal.instance.filter.black.field}" />
+		
 		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
 		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
 

+ 4 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -90,6 +90,10 @@
 				<constructor-arg index="1" value="false" />
 			</bean>
 		</property>
+		
+		<property name="fieldFilter" value="${canal.instance.filter.field}" />
+		<property name="fieldBlackFilter" value="${canal.instance.filter.black.field}" />
+		
 		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
 		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
 

+ 4 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -81,6 +81,10 @@
 				<constructor-arg index="1" value="false" />
 			</bean>
 		</property>
+		
+		<property name="fieldFilter" value="${canal.instance.filter.field}" />
+		<property name="fieldBlackFilter" value="${canal.instance.filter.black.field}" />
+		
 		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
 		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
 

+ 72 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -2,7 +2,9 @@ package com.alibaba.otter.canal.parse.inbound;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,6 +55,12 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     protected CanalEventFilter                       eventFilter                = null;
     protected CanalEventFilter                       eventBlackFilter           = null;
 
+    // 字段过滤
+    protected String		  			  			fieldFilter;
+    protected Map<String, List<String>> 			fieldFilterMap;
+    protected String		  			  			fieldBlackFilter;
+    protected Map<String, List<String>> 			fieldBlackFilterMap;
+    
     private CanalAlarmHandler                        alarmHandler               = null;
 
     // 统计参数
@@ -528,6 +536,31 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         }
         heartBeatTimerTask = null;
     }
+    
+    /**
+     * 解析字段过滤规则
+     */
+    private Map<String, List<String>> parseFieldFilterMap(String config) {
+    	
+    	Map<String, List<String>> map = new HashMap<String, List<String>>();
+		
+		if (StringUtils.isNotBlank(config)) {
+			for (String filter : config.split(",")) {
+				if (StringUtils.isBlank(filter)) {
+					continue;
+				}
+				
+				String[] filterConfig = filter.split(":");
+				if (filterConfig.length != 2) {
+					continue;
+				}
+				
+				map.put(filterConfig[0].trim().toUpperCase(), Arrays.asList(filterConfig[1].trim().toUpperCase().split("/")));
+			}
+		}
+		
+		return map;
+    }
 
     public void setEventFilter(CanalEventFilter eventFilter) {
         this.eventFilter = eventFilter;
@@ -655,4 +688,43 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         this.serverId = serverId;
     }
 
+    public String getFieldFilter() {
+		return fieldFilter;
+	}
+
+	public void setFieldFilter(String fieldFilter) {
+		this.fieldFilter = fieldFilter.trim();
+		this.fieldFilterMap = parseFieldFilterMap(fieldFilter);
+	}
+	
+	public String getFieldBlackFilter() {
+		return fieldBlackFilter;
+	}
+
+	public void setFieldBlackFilter(String fieldBlackFilter) {
+		this.fieldBlackFilter = fieldBlackFilter;
+		this.fieldBlackFilterMap = parseFieldFilterMap(fieldBlackFilter);
+	}
+
+	/**
+	 * 获取表字段过滤规则
+	 * @return
+	 * 	key:	schema.tableName
+	 * 	value:	字段列表
+	 */
+	public Map<String, List<String>> getFieldFilterMap() {
+		return fieldFilterMap;
+	}
+
+	/**
+	 * 获取表字段过滤规则黑名单
+	 * @return
+	 * 	key:	schema.tableName
+	 * 	value:	字段列表
+	 */
+	public Map<String, List<String>> getFieldBlackFilterMap() {
+		return fieldBlackFilterMap;
+	}
+	
+	
 }

+ 31 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -50,6 +50,9 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         if (eventBlackFilter != null && eventBlackFilter instanceof AviaterRegexFilter) {
             convert.setNameBlackFilter((AviaterRegexFilter) eventBlackFilter);
         }
+        
+        convert.setFieldFilterMap(getFieldFilterMap());
+        convert.setFieldBlackFilterMap(getFieldBlackFilterMap());
 
         convert.setCharset(connectionCharset);
         convert.setFilterQueryDcl(filterQueryDcl);
@@ -90,6 +93,34 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
             }
         }
     }
+    
+    @Override
+    public void setFieldFilter(String fieldFilter) {
+    	super.setFieldFilter(fieldFilter);
+    	
+    	// 触发一下filter变更
+    	if (binlogParser instanceof LogEventConvert) {
+            ((LogEventConvert) binlogParser).setFieldFilterMap(getFieldFilterMap());
+        }
+
+        if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
+            ((DatabaseTableMeta) tableMetaTSDB).setFieldFilterMap(getFieldFilterMap());
+        }
+    }
+    
+    @Override
+    public void setFieldBlackFilter(String fieldBlackFilter) {
+    	super.setFieldBlackFilter(fieldBlackFilter);
+    	
+    	// 触发一下filter变更
+    	if (binlogParser instanceof LogEventConvert) {
+            ((LogEventConvert) binlogParser).setFieldBlackFilterMap(getFieldBlackFilterMap());
+        }
+
+        if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
+            ((DatabaseTableMeta) tableMetaTSDB).setFieldBlackFilterMap(getFieldBlackFilterMap());
+        }
+    }
 
     /**
      * 回滚到指定位点

+ 61 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -8,7 +8,9 @@ import java.nio.charset.Charset;
 import java.sql.Types;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
@@ -86,6 +88,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     private volatile AviaterRegexFilter nameFilter;                                                          // 运行时引用可能会有变化,比如规则发生变化时
     private volatile AviaterRegexFilter nameBlackFilter;
+    private Map<String, List<String>> 	fieldFilterMap 		= new HashMap<String, List<String>>();
+    private Map<String, List<String>> 	fieldBlackFilterMap = new HashMap<String, List<String>>();
 
     private TableMetaCache              tableMetaCache;
     private Charset                     charset             = Charset.defaultCharset();
@@ -584,6 +588,15 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         boolean tableError = false;
         // check table fileds count,只能处理加字段
         boolean existRDSNoPrimaryKey = false;
+        //获取字段过滤条件
+        List<String> fieldList = null;
+        List<String> blackFieldList = null;
+        
+        if (tableMeta != null) {
+        	fieldList = fieldFilterMap.get(tableMeta.getFullName().toUpperCase());
+        	blackFieldList = fieldBlackFilterMap.get(tableMeta.getFullName().toUpperCase());
+        }
+        
         if (tableMeta != null && columnInfo.length > tableMeta.getFields().size()) {
             if (tableMetaCache.isOnRDS()) {
                 // 特殊处理下RDS的场景
@@ -650,10 +663,12 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 columnBuilder.setSqlType(Types.BIGINT);
                 columnBuilder.setUpdated(false);
 
-                if (isAfter) {
-                    rowDataBuilder.addAfterColumns(columnBuilder.build());
-                } else {
-                    rowDataBuilder.addBeforeColumns(columnBuilder.build());
+                if (needField(fieldList, blackFieldList, columnBuilder.getName())) {
+                	if (isAfter) {
+                        rowDataBuilder.addAfterColumns(columnBuilder.build());
+                    } else {
+                        rowDataBuilder.addBeforeColumns(columnBuilder.build());
+                    }
                 }
                 continue;
             }
@@ -820,10 +835,12 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                                      && isUpdate(rowDataBuilder.getBeforeColumnsList(),
                                          columnBuilder.getIsNull() ? null : columnBuilder.getValue(),
                                          i));
-            if (isAfter) {
-                rowDataBuilder.addAfterColumns(columnBuilder.build());
-            } else {
-                rowDataBuilder.addBeforeColumns(columnBuilder.build());
+            if (needField(fieldList, blackFieldList, columnBuilder.getName())) {
+            	if (isAfter) {
+                    rowDataBuilder.addAfterColumns(columnBuilder.build());
+                } else {
+                    rowDataBuilder.addBeforeColumns(columnBuilder.build());
+                }
             }
         }
 
@@ -958,6 +975,17 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private boolean isRDSHeartBeat(String schema, String table) {
         return "mysql".equalsIgnoreCase(schema) && "ha_health_check".equalsIgnoreCase(table);
     }
+    
+    /**
+     * 字段过滤判断
+     */
+    private boolean needField(List<String> fieldList, List<String> blackFieldList, String columnName) {
+    	if (fieldList == null || fieldList.isEmpty()) {
+    		return blackFieldList == null || blackFieldList.isEmpty() || !blackFieldList.contains(columnName.toUpperCase());
+    	} else {
+    		return fieldList.contains(columnName.toUpperCase());
+    	}
+    }
 
     public static TransactionBegin createTransactionBegin(long threadId) {
         TransactionBegin.Builder beginBuilder = TransactionBegin.newBuilder();
@@ -999,6 +1027,31 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         this.nameBlackFilter = nameBlackFilter;
         logger.warn("--> init table black filter : " + nameBlackFilter.toString());
     }
+    
+    public void setFieldFilterMap(Map<String, List<String>> fieldFilterMap) {
+    	if (fieldFilterMap != null) {
+    		this.fieldFilterMap = fieldFilterMap;
+    	} else {
+    		this.fieldFilterMap = new HashMap<String, List<String>>();
+    	}
+		
+		
+		for (Map.Entry<String, List<String>> entry : this.fieldFilterMap.entrySet()) {
+			logger.warn("--> init field filter : " + entry.getKey() + "->" + entry.getValue());
+		}
+	}
+    
+    public void setFieldBlackFilterMap(Map<String, List<String>> fieldBlackFilterMap) {
+		if (fieldBlackFilterMap != null) {
+    		this.fieldBlackFilterMap = fieldBlackFilterMap;
+    	} else {
+    		this.fieldBlackFilterMap = new HashMap<String, List<String>>();
+    	}
+		
+		for (Map.Entry<String, List<String>> entry : this.fieldBlackFilterMap.entrySet()) {
+			logger.warn("--> init field black filter : " + entry.getKey() + "->" + entry.getValue());
+		}
+	}
 
     public void setTableMetaCache(TableMetaCache tableMetaCache) {
         this.tableMetaCache = tableMetaCache;

+ 10 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -69,6 +69,8 @@ public class DatabaseTableMeta implements TableMetaTSDB {
     private volatile MysqlConnection        connection;                                                                 // 查询meta信息的链接
     private CanalEventFilter                filter;
     private CanalEventFilter                blackFilter;
+    private Map<String, List<String>> 		fieldFilterMap = new HashMap<String, List<String>>();
+    private Map<String, List<String>> 		fieldBlackFilterMap = new HashMap<String, List<String>>();
     private EntryPosition                   lastPosition;
     private boolean                         hasNewDdl;
     private MetaHistoryDAO                  metaHistoryDAO;
@@ -580,6 +582,14 @@ public class DatabaseTableMeta implements TableMetaTSDB {
     public void setBlackFilter(CanalEventFilter blackFilter) {
         this.blackFilter = blackFilter;
     }
+    
+    public void setFieldFilterMap(Map<String, List<String>> fieldFilterMap) {
+		this.fieldFilterMap = fieldFilterMap;
+	}
+    
+	public void setFieldBlackFilterMap(Map<String, List<String>> fieldBlackFilterMap) {
+		this.fieldBlackFilterMap = fieldBlackFilterMap;
+	}
 
     public int getSnapshotInterval() {
         return snapshotInterval;

+ 1 - 1
pom.xml

@@ -253,7 +253,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_914</version>
+                <version>2.0.0_preview_896</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>