Переглянути джерело

fixed adapter throw exception

agapple 6 роки тому
батько
коміт
f72ac9a3bb

+ 1 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -100,7 +100,7 @@ public class ESSyncService {
                     dml.getDestination(),
                     config.getEsMapping().get_index());
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);
             throw new RuntimeException(e);
         }

+ 9 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/HbaseTemplate.java

@@ -9,7 +9,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,6 +118,7 @@ public class HbaseTemplate {
             flag = true;
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
         return flag;
 
@@ -145,6 +151,7 @@ public class HbaseTemplate {
             flag = true;
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
         return flag;
     }
@@ -171,6 +178,7 @@ public class HbaseTemplate {
             flag = true;
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
         return flag;
     }

+ 8 - 5
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -2,7 +2,10 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -127,8 +130,8 @@ public abstract class AbstractCanalAdapterWorker {
     }
 
     @SuppressWarnings("unchecked")
-    protected boolean mqWriteOutData(int retry, long timeout, int i, final boolean flatMessage, CanalMQConnector connector,
-                                  ExecutorService workerExecutor) {
+    protected boolean mqWriteOutData(int retry, long timeout, int i, final boolean flatMessage,
+                                     CanalMQConnector connector, ExecutorService workerExecutor) {
         try {
             List<?> messages;
             if (!flatMessage) {
@@ -157,7 +160,7 @@ public abstract class AbstractCanalAdapterWorker {
                 }
                 connector.ack();
             }
-           return true;
+            return true;
         } catch (Throwable e) {
             if (i == retry - 1) {
                 connector.ack();
@@ -173,7 +176,7 @@ public abstract class AbstractCanalAdapterWorker {
                 // ignore
             }
         }
-        return  false;
+        return false;
     }
 
     /**

+ 17 - 11
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -26,7 +26,12 @@ import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbMirrorDbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
-import com.alibaba.otter.canal.client.adapter.support.*;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
  * RDB适配器实现类
@@ -73,8 +78,8 @@ public class RdbAdapter implements OuterAdapter {
         // 过滤不匹配的key的配置
         rdbMappingTmp.forEach((key, mappingConfig) -> {
             if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
-                || (mappingConfig.getOuterAdapterKey() != null
-                    && mappingConfig.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey()))) {
+                || (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
+                    .equalsIgnoreCase(configuration.getKey()))) {
                 rdbMapping.put(key, mappingConfig);
             }
         });
@@ -87,17 +92,17 @@ public class RdbAdapter implements OuterAdapter {
             String configName = entry.getKey();
             MappingConfig mappingConfig = entry.getValue();
             if (!mappingConfig.getDbMapping().getMirrorDb()) {
-                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(
-                    StringUtils.trimToEmpty(mappingConfig.getDestination()) + "." + mappingConfig.getDbMapping()
-                        .getDatabase() + "." + mappingConfig.getDbMapping().getTable(),
+                String key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                             + mappingConfig.getDbMapping().getDatabase() + "."
+                             + mappingConfig.getDbMapping().getTable();
+                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
                     k1 -> new ConcurrentHashMap<>());
                 configMap.put(configName, mappingConfig);
             } else {
                 // mirrorDB
-
-                mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                        + mappingConfig.getDbMapping().getDatabase(),
-                    MirrorDbConfig.create(configName, mappingConfig));
+                String key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                             + mappingConfig.getDbMapping().getDatabase();
+                mirrorDbConfigCache.put(key, MirrorDbConfig.create(configName, mappingConfig));
             }
         }
 
@@ -110,10 +115,11 @@ public class RdbAdapter implements OuterAdapter {
         dataSource.setPassword(properties.get("jdbc.password"));
         dataSource.setInitialSize(1);
         dataSource.setMinIdle(1);
-        dataSource.setMaxActive(10);
+        dataSource.setMaxActive(30);
         dataSource.setMaxWait(60000);
         dataSource.setTimeBetweenEvictionRunsMillis(60000);
         dataSource.setMinEvictableIdleTimeMillis(300000);
+        dataSource.setUseUnfairLock(true);
 
         try {
             dataSource.init();

+ 85 - 82
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -36,8 +36,7 @@ public class RdbEtlService {
     /**
      * 导入数据
      */
-    public static EtlResult importData(DataSource srcDS, DataSource targetDS, MappingConfig config,
-                                       List<String> params) {
+    public static EtlResult importData(DataSource srcDS, DataSource targetDS, MappingConfig config, List<String> params) {
         EtlResult etlResult = new EtlResult();
         AtomicLong successCount = new AtomicLong();
         List<String> errMsg = new ArrayList<>();
@@ -54,8 +53,8 @@ public class RdbEtlService {
             long start = System.currentTimeMillis();
 
             // 拼接sql
-            StringBuilder sql = new StringBuilder(
-                "SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable());
+            StringBuilder sql = new StringBuilder("SELECT * FROM " + dbMapping.getDatabase() + "."
+                                                  + dbMapping.getTable());
 
             // 拼接条件
             appendCondition(params, dbMapping, srcDS, sql);
@@ -92,8 +91,12 @@ public class RdbEtlService {
                     } else {
                         sqlFinal = sql + " LIMIT " + offset + "," + cnt;
                     }
-                    Future<Boolean> future = executor
-                        .submit(() -> executeSqlImport(srcDS, targetDS, sqlFinal, dbMapping, successCount, errMsg));
+                    Future<Boolean> future = executor.submit(() -> executeSqlImport(srcDS,
+                        targetDS,
+                        sqlFinal,
+                        dbMapping,
+                        successCount,
+                        errMsg));
                     futures.add(future);
                 }
 
@@ -106,10 +109,11 @@ public class RdbEtlService {
                 executeSqlImport(srcDS, targetDS, sql.toString(), dbMapping, successCount, errMsg);
             }
 
-            logger.info(
-                dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
+            logger.info(dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000
+                        + "s!");
 
-            etlResult.setResultMessage("导入目标表 " + SyncUtil.getDbTableName(dbMapping) + " 数据:" + successCount.get() + " 条");
+            etlResult.setResultMessage("导入目标表 " + SyncUtil.getDbTableName(dbMapping) + " 数据:" + successCount.get()
+                                       + " 条");
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
@@ -123,8 +127,8 @@ public class RdbEtlService {
         return etlResult;
     }
 
-    private static void appendCondition(List<String> params, DbMapping dbMapping, DataSource ds,
-                                        StringBuilder sql) throws SQLException {
+    private static void appendCondition(List<String> params, DbMapping dbMapping, DataSource ds, StringBuilder sql)
+                                                                                                                   throws SQLException {
         if (params != null && params.size() == 1 && dbMapping.getEtlCondition() == null) {
             AtomicBoolean stExists = new AtomicBoolean(false);
             // 验证是否有SYS_TIME字段
@@ -141,9 +145,9 @@ public class RdbEtlService {
                     }
                 } catch (Exception e) {
                     // ignore
-                }
-                return null;
-            });
+            }
+            return null;
+        }   );
             if (stExists.get()) {
                 sql.append(" WHERE SYS_TIME >= '").append(params.get(0)).append("' ");
             }
@@ -186,85 +190,84 @@ public class RdbEtlService {
                     // columnsMap = dbMapping.getTargetColumns();
                     // }
 
-                    StringBuilder insertSql = new StringBuilder();
-                    insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
-                    columnsMap
-                        .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+                StringBuilder insertSql = new StringBuilder();
+                insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
+                columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
 
-                    int len = insertSql.length();
-                    insertSql.delete(len - 1, len).append(") VALUES (");
-                    int mapLen = columnsMap.size();
-                    for (int i = 0; i < mapLen; i++) {
-                        insertSql.append("?,");
-                    }
-                    len = insertSql.length();
-                    insertSql.delete(len - 1, len).append(")");
-                    try (Connection connTarget = targetDS.getConnection();
-                            PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
-                        connTarget.setAutoCommit(false);
-
-                        while (rs.next()) {
-                            pstmt.clearParameters();
-
-                            // 删除数据
-                            Map<String, Object> values = new LinkedHashMap<>();
-                            StringBuilder deleteSql = new StringBuilder(
-                                "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
-                            appendCondition(dbMapping, deleteSql, values, rs);
-                            try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
-                                int k = 1;
-                                for (Object val : values.values()) {
-                                    pstmt2.setObject(k++, val);
-                                }
-                                pstmt2.execute();
+                int len = insertSql.length();
+                insertSql.delete(len - 1, len).append(") VALUES (");
+                int mapLen = columnsMap.size();
+                for (int i = 0; i < mapLen; i++) {
+                    insertSql.append("?,");
+                }
+                len = insertSql.length();
+                insertSql.delete(len - 1, len).append(")");
+                try (Connection connTarget = targetDS.getConnection();
+                        PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
+                    connTarget.setAutoCommit(false);
+
+                    while (rs.next()) {
+                        pstmt.clearParameters();
+
+                        // 删除数据
+                        Map<String, Object> values = new LinkedHashMap<>();
+                        StringBuilder deleteSql = new StringBuilder("DELETE FROM " + SyncUtil.getDbTableName(dbMapping)
+                                                                    + " WHERE ");
+                        appendCondition(dbMapping, deleteSql, values, rs);
+                        try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
+                            int k = 1;
+                            for (Object val : values.values()) {
+                                pstmt2.setObject(k++, val);
                             }
+                            pstmt2.execute();
+                        }
 
-                            int i = 1;
-                            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                                String targetClolumnName = entry.getKey();
-                                String srcColumnName = entry.getValue();
-                                if (srcColumnName == null) {
-                                    srcColumnName = targetClolumnName;
-                                }
-
-                                Integer type = columnType.get(targetClolumnName.toLowerCase());
+                        int i = 1;
+                        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                            String targetClolumnName = entry.getKey();
+                            String srcColumnName = entry.getValue();
+                            if (srcColumnName == null) {
+                                srcColumnName = targetClolumnName;
+                            }
 
-                                Object value = rs.getObject(srcColumnName);
-                                if (value != null) {
-                                    SyncUtil.setPStmt(type, pstmt, value, i);
-                                } else {
-                                    pstmt.setNull(i, type);
-                                }
+                            Integer type = columnType.get(targetClolumnName.toLowerCase());
 
-                                i++;
+                            Object value = rs.getObject(srcColumnName);
+                            if (value != null) {
+                                SyncUtil.setPStmt(type, pstmt, value, i);
+                            } else {
+                                pstmt.setNull(i, type);
                             }
 
-                            pstmt.execute();
-                            if (logger.isTraceEnabled()) {
-                                logger.trace("Insert into target table, sql: {}", insertSql);
-                            }
+                            i++;
+                        }
 
-                            if (idx % dbMapping.getCommitBatch() == 0) {
-                                connTarget.commit();
-                                completed = true;
-                            }
-                            idx++;
-                            successCount.incrementAndGet();
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("successful import count:" + successCount.get());
-                            }
+                        pstmt.execute();
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Insert into target table, sql: {}", insertSql);
                         }
-                        if (!completed) {
+
+                        if (idx % dbMapping.getCommitBatch() == 0) {
                             connTarget.commit();
+                            completed = true;
+                        }
+                        idx++;
+                        successCount.incrementAndGet();
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("successful import count:" + successCount.get());
                         }
                     }
-
-                } catch (Exception e) {
-                    logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
-                    errMsg.add(dbMapping.getTable() + " etl failed! ==>" + e.getMessage());
+                    if (!completed) {
+                        connTarget.commit();
+                    }
                 }
-                return idx;
-            });
+
+            } catch (Exception e) {
+                logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
+                errMsg.add(dbMapping.getTable() + " etl failed! ==>" + e.getMessage());
+            }
+            return idx;
+        }   );
             return true;
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
@@ -275,8 +278,8 @@ public class RdbEtlService {
     /**
      * 拼接目标表主键where条件
      */
-    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values,
-                                        ResultSet rs) throws SQLException {
+    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values, ResultSet rs)
+                                                                                                                         throws SQLException {
         // 拼接主键
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
             String targetColumnName = entry.getKey();

+ 30 - 28
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -70,35 +70,37 @@ public class RdbMirrorDbSyncService {
             }
         }
         if (!dmlList.isEmpty()) {
-            rdbSyncService.sync(dmlList, dml -> {
-                MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(dml.getDestination() + "." + dml.getDatabase());
-                if (mirrorDbConfig == null) {
-                    return false;
-                }
-                String table = dml.getTable();
-                MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
+            rdbSyncService.sync(dmlList,
+                dml -> {
+                    MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(dml.getDestination() + "."
+                                                                            + dml.getDatabase());
+                    if (mirrorDbConfig == null) {
+                        return false;
+                    }
+                    String table = dml.getTable();
+                    MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
 
-                if (config == null) {
-                    return false;
-                }
+                    if (config == null) {
+                        return false;
+                    }
 
-                if (config.getConcurrent()) {
-                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                    singleDmls.forEach(singleDml -> {
-                        int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
-                        RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
-                        rdbSyncService.getDmlsPartition()[hash].add(syncItem);
-                    });
-                } else {
-                    int hash = 0;
-                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                    singleDmls.forEach(singleDml -> {
-                        RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
-                        rdbSyncService.getDmlsPartition()[hash].add(syncItem);
-                    });
-                }
-                return true;
-            });
+                    if (config.getConcurrent()) {
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
+                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                        });
+                    } else {
+                        int hash = 0;
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                        });
+                    }
+                    return true;
+                });
         }
     }
 
@@ -148,7 +150,7 @@ public class RdbMirrorDbSyncService {
                 logger.trace("Execute DDL sql: {} for database: {}", ddl.getSql(), ddl.getDatabase());
             }
         } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
     }
 }

+ 41 - 36
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -7,7 +7,11 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.function.Function;
 
 import javax.sql.DataSource;
@@ -74,7 +78,7 @@ public class RdbSyncService {
                 executorThreads[i] = Executors.newSingleThreadExecutor();
             }
         } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
     }
 
@@ -99,12 +103,13 @@ public class RdbSyncService {
                 int j = i;
                 futures.add(executorThreads[i].submit(() -> {
                     try {
-                        dmlsPartition[j]
-                            .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
+                        dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
+                            syncItem.config,
+                            syncItem.singleDml));
                         dmlsPartition[j].clear();
                         batchExecutors[j].commit();
                         return true;
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         batchExecutors[j].rollback();
                         throw new RuntimeException(e);
                     }
@@ -131,41 +136,41 @@ public class RdbSyncService {
         sync(dmls, dml -> {
             if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
                 // DDL
-                columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+            columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+            return false;
+        } else {
+            // DML
+            String destination = StringUtils.trimToEmpty(dml.getDestination());
+            String database = dml.getDatabase();
+            String table = dml.getTable();
+            Map<String, MappingConfig> configMap = mappingConfig.get(destination + "." + database + "." + table);
+
+            if (configMap == null) {
                 return false;
-            } else {
-                // DML
-                String destination = StringUtils.trimToEmpty(dml.getDestination());
-                String database = dml.getDatabase();
-                String table = dml.getTable();
-                Map<String, MappingConfig> configMap = mappingConfig.get(destination + "." + database + "." + table);
-
-                if (configMap == null) {
-                    return false;
-                }
+            }
 
-                boolean executed = false;
-                for (MappingConfig config : configMap.values()) {
-                    if (config.getConcurrent()) {
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            int hash = pkHash(config.getDbMapping(), singleDml.getData());
-                            SyncItem syncItem = new SyncItem(config, singleDml);
-                            dmlsPartition[hash].add(syncItem);
-                        });
-                    } else {
-                        int hash = 0;
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            SyncItem syncItem = new SyncItem(config, singleDml);
-                            dmlsPartition[hash].add(syncItem);
-                        });
-                    }
-                    executed = true;
+            boolean executed = false;
+            for (MappingConfig config : configMap.values()) {
+                if (config.getConcurrent()) {
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                    singleDmls.forEach(singleDml -> {
+                        int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                        SyncItem syncItem = new SyncItem(config, singleDml);
+                        dmlsPartition[hash].add(syncItem);
+                    });
+                } else {
+                    int hash = 0;
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                    singleDmls.forEach(singleDml -> {
+                        SyncItem syncItem = new SyncItem(config, singleDml);
+                        dmlsPartition[hash].add(syncItem);
+                    });
                 }
-                return executed;
+                executed = true;
             }
-        });
+            return executed;
+        }
+    }   );
     }
 
     /**