Browse Source

fixed issue #1168, support filter with compareTableMetaDbAndMemory

agapple 6 years ago
parent
commit
cbab3fb561

+ 14 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -76,7 +76,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
     private int                             snapshotInterval = 24;
     private int                             snapshotInterval = 24;
     private int                             snapshotExpire   = 360;
     private int                             snapshotExpire   = 360;
     private ScheduledFuture<?>              scheduleSnapshotFuture;
     private ScheduledFuture<?>              scheduleSnapshotFuture;
-    
+
     public DatabaseTableMeta(){
     public DatabaseTableMeta(){
 
 
     }
     }
@@ -207,8 +207,8 @@ public class DatabaseTableMeta implements TableMetaTSDB {
                 packet = connection.query("show full tables from `" + schema + "` where Table_type = 'BASE TABLE'");
                 packet = connection.query("show full tables from `" + schema + "` where Table_type = 'BASE TABLE'");
                 List<String> tables = new ArrayList<String>();
                 List<String> tables = new ArrayList<String>();
                 for (String table : packet.getFieldValues()) {
                 for (String table : packet.getFieldValues()) {
-                    if("BASE TABLE".equalsIgnoreCase(table)){
-                       continue; 
+                    if ("BASE TABLE".equalsIgnoreCase(table)) {
+                        continue;
                     }
                     }
                     String fullName = schema + "." + table;
                     String fullName = schema + "." + table;
                     if (blackFilter == null || !blackFilter.filter(fullName)) {
                     if (blackFilter == null || !blackFilter.filter(fullName)) {
@@ -310,11 +310,19 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         boolean compareAll = true;
         boolean compareAll = true;
         for (Schema schema : tmpMemoryTableMeta.getRepository().getSchemas()) {
         for (Schema schema : tmpMemoryTableMeta.getRepository().getSchemas()) {
             for (String table : schema.showTables()) {
             for (String table : schema.showTables()) {
-                if (!compareTableMetaDbAndMemory(connection, tmpMemoryTableMeta, schema.getName(), table)) {
-                    compareAll = false;
+                String fullName = schema + "." + table;
+                if (blackFilter == null || !blackFilter.filter(fullName)) {
+                    if (filter == null || filter.filter(fullName)) {
+                        // issue : https://github.com/alibaba/canal/issues/1168
+                        // 在生成snapshot时重新过滤一遍
+                        if (!compareTableMetaDbAndMemory(connection, tmpMemoryTableMeta, schema.getName(), table)) {
+                            compareAll = false;
+                        }
+                    }
                 }
                 }
             }
             }
         }
         }
+
         if (compareAll) {
         if (compareAll) {
             Map<String, String> content = new HashMap<String, String>();
             Map<String, String> content = new HashMap<String, String>();
             content.put("destination", destination);
             content.put("destination", destination);
@@ -513,7 +521,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
             String sign = sourceField.isUnsigned() ? "unsigned" : "signed";
             String sign = sourceField.isUnsigned() ? "unsigned" : "signed";
             String sourceColumnType = StringUtils.removeEndIgnoreCase(sourceField.getColumnType(), sign).trim();
             String sourceColumnType = StringUtils.removeEndIgnoreCase(sourceField.getColumnType(), sign).trim();
             String targetColumnType = StringUtils.removeEndIgnoreCase(targetField.getColumnType(), sign).trim();
             String targetColumnType = StringUtils.removeEndIgnoreCase(targetField.getColumnType(), sign).trim();
-            
+
             boolean columnTypeCompare = false;
             boolean columnTypeCompare = false;
             columnTypeCompare |= StringUtils.containsIgnoreCase(sourceColumnType, targetColumnType);
             columnTypeCompare |= StringUtils.containsIgnoreCase(sourceColumnType, targetColumnType);
             columnTypeCompare |= StringUtils.containsIgnoreCase(targetColumnType, sourceColumnType);
             columnTypeCompare |= StringUtils.containsIgnoreCase(targetColumnType, sourceColumnType);