Browse Source

feat 元数据与target数据源不一致时 进行兼容处理 (#5168)

* feat 元数据与target数据源不一致时 进行兼容处理

* feat 元数据与target数据源不一致时 进行兼容处理  优化代码

* Feature #FR-761 ddl执行失败会造成后续未执行的dml丢弃 针对ddl exec failed catch the ex
aiden.liu 1 year ago
parent
commit
c1f5a5a8e4

+ 2 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -164,7 +164,8 @@ public class RdbMirrorDbSyncService {
                 logger.trace("Execute DDL sql: {} for database: {}", ddl.getSql(), ddl.getDatabase());
             }
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            logger.error("Execute DDL sql failed sql: {} for database: {} cause: {}",
+                    ddl.getSql(), ddl.getDatabase(), e.getMessage());
         }
     }
 }

+ 69 - 17
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -1,10 +1,25 @@
 package com.alibaba.otter.canal.client.adapter.rdb.service;
 
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter.Feature;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.rdb.support.BatchExecutor;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.sql.Connection;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -15,21 +30,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.function.Function;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.druid.pool.DruidDataSource;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONWriter.Feature;
-import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
-import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
-import com.alibaba.otter.canal.client.adapter.rdb.support.BatchExecutor;
-import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
-import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.Util;
+import java.util.stream.Collectors;
 
 /**
  * RDB同步操作业务
@@ -272,7 +273,7 @@ public class RdbSyncService {
         len = insertSql.length();
         insertSql.delete(len - 1, len).append(")");
 
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+        Map<String, Integer> ctype = getTargetColumnTypeByColumns(batchExecutor.getConn(), config, columnsMap);
 
         List<Map<String, ?>> values = new ArrayList<>();
         for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
@@ -328,7 +329,7 @@ public class RdbSyncService {
         String backtick = SyncUtil.getBacktickByDbType(dataSource.getDbType());
         Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+        Map<String, Integer> ctype = getTargetColumnTypeByColumns(batchExecutor.getConn(), config, columnsMap);
 
         StringBuilder updateSql = new StringBuilder();
         updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())).append(" SET ");
@@ -410,6 +411,57 @@ public class RdbSyncService {
         }
     }
 
+    /**
+     * Obtain metadata for the corresponding table columns of the target data source
+     * Check if the metadata of the columns in the src is consistent with the metadata cache of the columns in the cache
+     * (first determine the number of columns and then determine if each column is consistent).
+     * If not, reload
+     * @param conn
+     * @param config
+     * @param columnsMap
+     * @date 2024/6/17 14:45
+     */
+    private Map<String, Integer> getTargetColumnTypeByColumns(Connection conn, MappingConfig config,
+                                                              Map<String, String> columnsMap) {
+        DbMapping dbMapping = config.getDbMapping();
+        String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
+        Map<String, Integer> columnType = getTargetColumnType(conn, config);
+        if(null != columnType && null != columnsMap &&
+                !checkCloumnConsistent(columnType.keySet(), columnsMap.keySet())){
+            logger.info("column meta data not match, reload the column meta data from target db. key:{}", cacheKey);
+            columnsTypeCache.remove(cacheKey);
+            columnType = getTargetColumnType(conn, config);
+            if(null != columnType && null != columnsMap &&
+                    !checkCloumnConsistent(columnType.keySet(), columnsMap.keySet())) {
+                logger.info("the column meta data from db still not match. dbdata:{}", columnType.keySet());
+            }
+        }
+        return columnType;
+    }
+
+
+    /**
+     * 判断binlog的中的列的数据和缓存的列数据是否一致
+     *
+     * @param <T>  集合元素类型
+     * @param column
+     * @param anotherColumn
+     * @return {@link Boolean}
+     */
+    public static<T> Boolean checkCloumnConsistent(Collection<T> column, Collection<T> anotherColumn) {
+        if(null == column || null == anotherColumn || column.size() !=  anotherColumn.size()){
+            return false;
+        }
+        Map<T, T> anotherColumnMap = column.stream().collect(Collectors.toMap(Function.identity(), Function.identity()));
+
+        for (T t : column) {
+            if(null == anotherColumnMap.get(t)){
+                return false;
+            }
+        }
+        return true;
+    }
+
     /**
      * 获取目标字段类型
      *

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -26,7 +26,7 @@ public class SyncUtil {
     public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Collection<String> columns) {
         Map<String, String> columnsMap;
         if (dbMapping.getMapAll()) {
-            if (dbMapping.getAllMapColumns() != null) {
+            if (dbMapping.getAllMapColumns() != null && columns.size() == dbMapping.getAllMapColumns().size() ) {
                 return dbMapping.getAllMapColumns();
             }
             columnsMap = new LinkedHashMap<>();

+ 1 - 1
client-adapter/tablestore/pom.xml

@@ -10,7 +10,7 @@
     <groupId>com.alibaba.otter</groupId>
     <artifactId>client-adapter.tablestore</artifactId>
     <packaging>jar</packaging>
-    <name>canal client adapter rdb module for otter ${project.version}</name>
+    <name>canal client adapter tablestore module for otter ${project.version}</name>
 
     <dependencies>
         <dependency>