Преглед изворни кода

Feature/adapter postgresql 20250124 (#5390)

* adapter tcp模式mysql数据全量同步至postgresql时 报错
fix issue 2146

* fix issue 1980

* feat 增加 当dbType 为 postgresql时,返回双引号(避免表名为postgresql的关键字时,sql查询报错)

---------

Co-authored-by: guodong.zhang <zgd8988@126.com>
nakeiven пре 3 месеци
родитељ
комит
c998b3f100

+ 17 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java

@@ -16,6 +16,7 @@ import java.util.function.Function;
 
 import javax.sql.DataSource;
 
+import com.alibaba.druid.pool.DruidDataSource;
 import org.apache.commons.lang.StringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -38,7 +39,15 @@ public class Util {
     public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) {
         try (Connection conn = ds.getConnection();
                 Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
-            stmt.setFetchSize(Integer.MIN_VALUE);
+
+            DruidDataSource druidDataSource = (DruidDataSource) ds;
+            if ("postgresql".equals(druidDataSource.getDbType())) {
+                conn.setAutoCommit(false);
+                stmt.setFetchSize(1000);
+            } else {
+                stmt.setFetchSize(Integer.MIN_VALUE);
+            }
+
             try (ResultSet rs = stmt.executeQuery(sql)) {
                 return fun.apply(rs);
             }
@@ -52,7 +61,13 @@ public class Util {
         try (Connection conn = ds.getConnection()) {
             try (PreparedStatement pstmt = conn
                 .prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
-                pstmt.setFetchSize(Integer.MIN_VALUE);
+                DruidDataSource druidDataSource = (DruidDataSource) ds;
+                if ("postgresql".equals(druidDataSource.getDbType())) {
+                    conn.setAutoCommit(false);
+                    pstmt.setFetchSize(1000);
+                } else {
+                    pstmt.setFetchSize(Integer.MIN_VALUE);
+                }
                 if (values != null) {
                     for (int i = 0; i < values.size(); i++) {
                         pstmt.setObject(i + 1, values.get(i));

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -294,7 +294,7 @@ public class RdbSyncService {
             batchExecutor.execute(insertSql.toString(), values);
         } catch (SQLException e) {
             if (skipDupException
-                && (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001:"))) {
+                && (e.getMessage().contains("Duplicate entry") || e.getMessage().contains("duplicate key") || e.getMessage().startsWith("ORA-00001:"))) {
                 // ignore
                 // TODO 增加更多关系数据库的主键冲突的错误码
             } else {

+ 3 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -313,6 +313,9 @@ public class SyncUtil {
             case mariadb:
             case oceanbase:
                 return "`";
+        //  当dbType 为 postgresql时,返回双引号(避免表名为postgresql的关键字时,sql查询报错)
+            case postgresql:
+                return "\"";
             default:
                 return "";
         }