agapple 6 years ago
parent
commit
fdf150bd24

+ 8 - 2
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -12,6 +12,7 @@ import java.util.concurrent.Future;
 
 import javax.sql.DataSource;
 
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,12 +131,17 @@ public class RdbAdapter implements OuterAdapter {
         String threads = properties.get("threads");
         // String commitSize = properties.get("commitSize");
 
-        rdbSyncService = new RdbSyncService(dataSource, threads != null ? Integer.valueOf(threads) : null);
+        boolean skipDupException = BooleanUtils.toBoolean(configuration.getProperties()
+            .getOrDefault("skipDupException", "true"));
+        rdbSyncService = new RdbSyncService(dataSource,
+            threads != null ? Integer.valueOf(threads) : null,
+            skipDupException);
 
         rdbMirrorDbSyncService = new RdbMirrorDbSyncService(mirrorDbConfigCache,
             dataSource,
             threads != null ? Integer.valueOf(threads) : null,
-            rdbSyncService.getColumnsTypeCache());
+            rdbSyncService.getColumnsTypeCache(),
+            skipDupException);
 
         rdbConfigMonitor = new RdbConfigMonitor();
         rdbConfigMonitor.init(configuration.getKey(), this);

+ 3 - 2
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -35,10 +35,11 @@ public class RdbMirrorDbSyncService {
     private RdbSyncService              rdbSyncService;                                                // rdbSyncService代理
 
     public RdbMirrorDbSyncService(Map<String, MirrorDbConfig> mirrorDbConfigCache, DataSource dataSource,
-                                  Integer threads, Map<String, Map<String, Integer>> columnsTypeCache){
+                                  Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
+                                  boolean skipDupException){
         this.mirrorDbConfigCache = mirrorDbConfigCache;
         this.dataSource = dataSource;
-        this.rdbSyncService = new RdbSyncService(dataSource, threads, columnsTypeCache);
+        this.rdbSyncService = new RdbSyncService(dataSource, threads, columnsTypeCache, skipDupException);
     }
 
     /**

+ 34 - 22
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -44,6 +44,7 @@ public class RdbSyncService {
     private Map<String, Map<String, Integer>> columnsTypeCache;
 
     private int                               threads = 3;
+    private boolean                           skipDupException;
 
     private List<SyncItem>[]                  dmlsPartition;
     private BatchExecutor[]                   batchExecutors;
@@ -58,13 +59,15 @@ public class RdbSyncService {
     }
 
     @SuppressWarnings("unchecked")
-    public RdbSyncService(DataSource dataSource, Integer threads){
-        this(dataSource, threads, new ConcurrentHashMap<>());
+    public RdbSyncService(DataSource dataSource, Integer threads, boolean skipDupException){
+        this(dataSource, threads, new ConcurrentHashMap<>(), skipDupException);
     }
 
     @SuppressWarnings("unchecked")
-    public RdbSyncService(DataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache){
+    public RdbSyncService(DataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
+                          boolean skipDupException){
         this.columnsTypeCache = columnsTypeCache;
+        this.skipDupException = skipDupException;
         try {
             if (threads != null) {
                 this.threads = threads;
@@ -182,16 +185,20 @@ public class RdbSyncService {
      */
     public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
         if (config != null) {
-            String type = dml.getType();
-            if (type != null && type.equalsIgnoreCase("INSERT")) {
-                insert(batchExecutor, config, dml);
-            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
-                update(batchExecutor, config, dml);
-            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
-                delete(batchExecutor, config, dml);
-            }
-            if (logger.isDebugEnabled()) {
-                logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+            try {
+                String type = dml.getType();
+                if (type != null && type.equalsIgnoreCase("INSERT")) {
+                    insert(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                    update(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                    delete(batchExecutor, config, dml);
+                }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
             }
         }
     }
@@ -202,7 +209,7 @@ public class RdbSyncService {
      * @param config 配置项
      * @param dml DML数据
      */
-    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
         Map<String, Object> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
@@ -240,11 +247,20 @@ public class RdbSyncService {
                 throw new RuntimeException("Target column: " + targetColumnName + " not matched");
             }
             Object value = data.get(srcColumnName);
-
             BatchExecutor.setValue(values, type, value);
         }
 
-        batchExecutor.execute(insertSql.toString(), values);
+        try {
+            batchExecutor.execute(insertSql.toString(), values);
+        } catch (SQLException e) {
+            if (skipDupException
+                && (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001: 违反唯一约束条件"))) {
+                // ignore
+                // TODO 增加更多关系数据库的主键冲突的错误码
+            } else {
+                throw e;
+            }
+        }
         if (logger.isTraceEnabled()) {
             logger.trace("Insert into target table, sql: {}", insertSql);
         }
@@ -257,7 +273,7 @@ public class RdbSyncService {
      * @param config 配置项
      * @param dml DML数据
      */
-    private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+    private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
         Map<String, Object> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
@@ -301,9 +317,7 @@ public class RdbSyncService {
 
         // 拼接主键
         appendCondition(dbMapping, updateSql, ctype, values, data, old);
-
         batchExecutor.execute(updateSql.toString(), values);
-
         if (logger.isTraceEnabled()) {
             logger.trace("Update target table, sql: {}", updateSql);
         }
@@ -315,7 +329,7 @@ public class RdbSyncService {
      * @param config
      * @param dml
      */
-    private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+    private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
         Map<String, Object> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
@@ -331,9 +345,7 @@ public class RdbSyncService {
         List<Map<String, ?>> values = new ArrayList<>();
         // 拼接主键
         appendCondition(dbMapping, sql, ctype, values, data);
-
         batchExecutor.execute(sql.toString(), values);
-
         if (logger.isTraceEnabled()) {
             logger.trace("Delete from target table, sql: {}", sql);
         }

+ 26 - 41
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java

@@ -19,16 +19,11 @@ public class BatchExecutor implements Closeable {
     private Connection          conn;
     private AtomicInteger       idx    = new AtomicInteger(0);
 
-    public BatchExecutor(Connection conn){
+    public BatchExecutor(Connection conn) throws SQLException{
         this.conn = conn;
-        try {
-            this.conn.setAutoCommit(false);
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
-        }
+        this.conn.setAutoCommit(false);
     }
 
-
     public Connection getConn() {
         return conn;
     }
@@ -40,53 +35,43 @@ public class BatchExecutor implements Closeable {
         values.add(valueItem);
     }
 
-    public void execute(String sql, List<Map<String, ?>> values) {
-        try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
-            int len = values.size();
-            for (int i = 0; i < len; i++) {
-                int type = (Integer) values.get(i).get("type");
-                Object value = values.get(i).get("value");
-                SyncUtil.setPStmt(type, pstmt, value, i + 1);
-            }
-
-            pstmt.execute();
-            idx.incrementAndGet();
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
+    public void execute(String sql, List<Map<String, ?>> values) throws SQLException {
+        PreparedStatement pstmt = conn.prepareStatement(sql);
+        int len = values.size();
+        for (int i = 0; i < len; i++) {
+            int type = (Integer) values.get(i).get("type");
+            Object value = values.get(i).get("value");
+            SyncUtil.setPStmt(type, pstmt, value, i + 1);
         }
+
+        pstmt.execute();
+        idx.incrementAndGet();
     }
 
-    public void commit() {
-        try {
-            conn.commit();
-            if (logger.isTraceEnabled()) {
-                logger.trace("Batch executor commit " + idx.get() + " rows");
-            }
-            idx.set(0);
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
+    public void commit() throws SQLException {
+        conn.commit();
+        if (logger.isTraceEnabled()) {
+            logger.trace("Batch executor commit " + idx.get() + " rows");
         }
+        idx.set(0);
     }
 
-    public void rollback() {
-        try {
-            conn.rollback();
-            if (logger.isTraceEnabled()) {
-                logger.trace("Batch executor rollback " + idx.get() + " rows");
-            }
-            idx.set(0);
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
+    public void rollback() throws SQLException {
+        conn.rollback();
+        if (logger.isTraceEnabled()) {
+            logger.trace("Batch executor rollback " + idx.get() + " rows");
         }
+        idx.set(0);
     }
 
     @Override
     public void close() {
         if (conn != null) {
             try {
-                conn.close();
-            } catch (SQLException e) {
-                logger.error(e.getMessage(), e);
+                if (conn != null) {
+                    conn.close();
+                }
+            } catch (SQLException ioe) {
             }
         }
     }