Преглед изворни кода

修复adapter无法回退的问题,fixed 1341
修复HBase断连的问题

mcy пре 6 година
родитељ
комит
9e6adff942

+ 3 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -102,13 +102,13 @@ public class ESSyncService {
             }
         } catch (Exception e) {
             logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);
-            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
     }
 
     /**
      * 插入操作dml
-     * 
+     *
      * @param config es配置
      * @param dml dml数据
      */
@@ -429,7 +429,7 @@ public class ESSyncService {
 
     /**
      * 主表(单表)复杂字段insert
-     * 
+     *
      * @param config es配置
      * @param dml dml信息
      * @param data 单行dml数据

+ 7 - 17
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -13,8 +13,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -30,11 +28,7 @@ import com.alibaba.otter.canal.client.adapter.hbase.monitor.HbaseConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseEtlService;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
 import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.EtlResult;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.*;
 
 /**
  * HBase外部适配器
@@ -50,7 +44,6 @@ public class HbaseAdapter implements OuterAdapter {
     private Map<String, MappingConfig>              hbaseMapping       = new ConcurrentHashMap<>();                  // 文件名对应配置
     private Map<String, Map<String, MappingConfig>> mappingConfigCache = new ConcurrentHashMap<>();                  // 库名-表名对应配置
 
-    private Connection                              conn;
     private HbaseSyncService                        hbaseSyncService;
     private HbaseTemplate                           hbaseTemplate;
 
@@ -90,8 +83,7 @@ public class HbaseAdapter implements OuterAdapter {
 
             Configuration hbaseConfig = HBaseConfiguration.create();
             properties.forEach(hbaseConfig::set);
-            conn = ConnectionFactory.createConnection(hbaseConfig);
-            hbaseTemplate = new HbaseTemplate(conn);
+            hbaseTemplate = new HbaseTemplate(hbaseConfig);
             hbaseSyncService = new HbaseSyncService(hbaseTemplate);
 
             configMonitor = new HbaseConfigMonitor();
@@ -174,7 +166,7 @@ public class HbaseAdapter implements OuterAdapter {
         String hbaseTable = config.getHbaseMapping().getHbaseTable();
         long rowCount = 0L;
         try {
-            HTable table = (HTable) conn.getTable(TableName.valueOf(hbaseTable));
+            HTable table = (HTable) hbaseTemplate.getConnection().getTable(TableName.valueOf(hbaseTable));
             Scan scan = new Scan();
             scan.setFilter(new FirstKeyOnlyFilter());
             ResultScanner resultScanner = table.getScanner(scan);
@@ -195,12 +187,10 @@ public class HbaseAdapter implements OuterAdapter {
         if (configMonitor != null) {
             configMonitor.destroy();
         }
-        if (conn != null) {
-            try {
-                conn.close();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
+        try {
+            hbaseTemplate.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
     }
 

+ 15 - 19
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -29,28 +29,24 @@ public class HbaseSyncService {
     }
 
     public void sync(MappingConfig config, Dml dml) {
-        try {
-            if (config != null) {
-                String type = dml.getType();
-                if (type != null && type.equalsIgnoreCase("INSERT")) {
-                    insert(config, dml);
-                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
-                    update(config, dml);
-                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
-                    delete(config, dml);
-                }
-                if (logger.isDebugEnabled()) {
-                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
-                }
+        if (config != null) {
+            String type = dml.getType();
+            if (type != null && type.equalsIgnoreCase("INSERT")) {
+                insert(config, dml);
+            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                update(config, dml);
+            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                delete(config, dml);
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
             }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * 插入操作
-     * 
+     *
      * @param config 配置项
      * @param dml DML数据
      */
@@ -102,7 +98,7 @@ public class HbaseSyncService {
 
     /**
      * 将Map数据转换为HRow行数据
-     * 
+     *
      * @param hbaseMapping hbase映射配置
      * @param hRow 行对象
      * @param data Map数据
@@ -160,7 +156,7 @@ public class HbaseSyncService {
 
     /**
      * 更新操作
-     * 
+     *
      * @param config 配置对象
      * @param dml dml对象
      */
@@ -380,7 +376,7 @@ public class HbaseSyncService {
 
     /**
      * 根据对应的类型进行转换
-     * 
+     *
      * @param columnItem 列项配置
      * @param hbaseMapping hbase映射配置
      * @param value 值

+ 39 - 19
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/HbaseTemplate.java

@@ -5,14 +5,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,16 +22,33 @@ import org.slf4j.LoggerFactory;
  */
 public class HbaseTemplate {
 
-    private Logger     logger = LoggerFactory.getLogger(this.getClass());
+    private Logger        logger = LoggerFactory.getLogger(this.getClass());
 
-    private Connection conn;
+    private Configuration hbaseConfig;                                      // hbase配置对象
+    private Connection    conn;                                             // hbase连接
 
-    public HbaseTemplate(Connection conn){
-        this.conn = conn;
+    public HbaseTemplate(Configuration hbaseConfig){
+        this.hbaseConfig = hbaseConfig;
+        initConn();
+    }
+
+    private void initConn() {
+        try {
+            this.conn = ConnectionFactory.createConnection(hbaseConfig);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Connection getConnection() {
+        if (conn == null || conn.isAborted() || conn.isClosed()) {
+            initConn();
+        }
+        return conn;
     }
 
     public boolean tableExists(String tableName) {
-        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
+        try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
 
             return admin.tableExists(TableName.valueOf(tableName));
         } catch (IOException e) {
@@ -43,7 +57,7 @@ public class HbaseTemplate {
     }
 
     public void createTable(String tableName, String... familyNames) {
-        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
+        try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
 
             HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
             // 添加列簇
@@ -60,7 +74,7 @@ public class HbaseTemplate {
     }
 
     public void disableTable(String tableName) {
-        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
+        try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
             admin.disableTable(tableName);
         } catch (IOException e) {
             logger.error(e.getMessage(), e);
@@ -69,7 +83,7 @@ public class HbaseTemplate {
     }
 
     public void deleteTable(String tableName) {
-        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
+        try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
             if (admin.isTableEnabled(tableName)) {
                 disableTable(tableName);
             }
@@ -82,7 +96,7 @@ public class HbaseTemplate {
 
     /**
      * 插入一行数据
-     * 
+     *
      * @param tableName 表名
      * @param hRow 行数据对象
      * @return 是否成功
@@ -90,7 +104,7 @@ public class HbaseTemplate {
     public Boolean put(String tableName, HRow hRow) {
         boolean flag = false;
         try {
-            HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
+            HTable table = (HTable) getConnection().getTable(TableName.valueOf(tableName));
             Put put = new Put(hRow.getRowKey());
             for (HRow.HCell hCell : hRow.getCells()) {
                 put.addColumn(Bytes.toBytes(hCell.getFamily()), Bytes.toBytes(hCell.getQualifier()), hCell.getValue());
@@ -106,7 +120,7 @@ public class HbaseTemplate {
 
     /**
      * 批量插入
-     * 
+     *
      * @param tableName 表名
      * @param rows 行数据对象集合
      * @return 是否成功
@@ -114,7 +128,7 @@ public class HbaseTemplate {
     public Boolean puts(String tableName, List<HRow> rows) {
         boolean flag = false;
         try {
-            HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
+            HTable table = (HTable) getConnection().getTable(TableName.valueOf(tableName));
             List<Put> puts = new ArrayList<>();
             for (HRow hRow : rows) {
                 Put put = new Put(hRow.getRowKey());
@@ -137,7 +151,7 @@ public class HbaseTemplate {
 
     /**
      * 批量删除数据
-     * 
+     *
      * @param tableName 表名
      * @param rowKeys rowKey集合
      * @return 是否成功
@@ -145,7 +159,7 @@ public class HbaseTemplate {
     public Boolean deletes(String tableName, Set<byte[]> rowKeys) {
         boolean flag = false;
         try {
-            HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
+            HTable table = (HTable) getConnection().getTable(TableName.valueOf(tableName));
             List<Delete> deletes = new ArrayList<>();
             for (byte[] rowKey : rowKeys) {
                 Delete delete = new Delete(rowKey);
@@ -160,4 +174,10 @@ public class HbaseTemplate {
         }
         return flag;
     }
+
+    public void close() throws IOException {
+        if (conn != null) {
+            conn.close();
+        }
+    }
 }

+ 107 - 120
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -7,10 +7,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 import java.util.function.Function;
 
 import javax.sql.DataSource;
@@ -88,41 +85,39 @@ public class RdbSyncService {
      * @param function 回调方法
      */
     public void sync(List<Dml> dmls, Function<Dml, Boolean> function) {
-        try {
-            boolean toExecute = false;
-            for (Dml dml : dmls) {
-                if (!toExecute) {
-                    toExecute = function.apply(dml);
-                } else {
-                    function.apply(dml);
-                }
+        boolean toExecute = false;
+        for (Dml dml : dmls) {
+            if (!toExecute) {
+                toExecute = function.apply(dml);
+            } else {
+                function.apply(dml);
             }
-            if (toExecute) {
-                List<Future> futures = new ArrayList<>();
-                for (int i = 0; i < threads; i++) {
-                    int j = i;
-                    futures.add(executorThreads[i].submit(() -> {
+        }
+        if (toExecute) {
+            List<Future> futures = new ArrayList<>();
+            for (int i = 0; i < threads; i++) {
+                int j = i;
+                futures.add(executorThreads[i].submit(() -> {
+                    try {
                         dmlsPartition[j]
                             .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
+                        dmlsPartition[j].clear();
                         batchExecutors[j].commit();
                         return true;
-                    }));
-                }
-
-                futures.forEach(future -> {
-                    try {
-                        future.get();
                     } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
+                        batchExecutors[j].rollback();
+                        throw new RuntimeException(e);
                     }
-                });
+                }));
+            }
 
-                for (int i = 0; i < threads; i++) {
-                    dmlsPartition[i].clear();
+            futures.forEach(future -> {
+                try {
+                    future.get();
+                } catch (ExecutionException | InterruptedException e) {
+                    throw new RuntimeException(e);
                 }
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+            });
         }
     }
 
@@ -181,22 +176,18 @@ public class RdbSyncService {
      * @param dml DML
      */
     public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
-        try {
-            if (config != null) {
-                String type = dml.getType();
-                if (type != null && type.equalsIgnoreCase("INSERT")) {
-                    insert(batchExecutor, config, dml);
-                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
-                    update(batchExecutor, config, dml);
-                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
-                    delete(batchExecutor, config, dml);
-                }
-                if (logger.isDebugEnabled()) {
-                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
-                }
+        if (config != null) {
+            String type = dml.getType();
+            if (type != null && type.equalsIgnoreCase("INSERT")) {
+                insert(batchExecutor, config, dml);
+            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                update(batchExecutor, config, dml);
+            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                delete(batchExecutor, config, dml);
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
             }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
         }
     }
 
@@ -214,47 +205,45 @@ public class RdbSyncService {
 
         DbMapping dbMapping = config.getDbMapping();
 
-        try {
-            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
-            StringBuilder insertSql = new StringBuilder();
-            insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
+        StringBuilder insertSql = new StringBuilder();
+        insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
 
-            columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
-            int len = insertSql.length();
-            insertSql.delete(len - 1, len).append(") VALUES (");
-            int mapLen = columnsMap.size();
-            for (int i = 0; i < mapLen; i++) {
-                insertSql.append("?,");
-            }
-            len = insertSql.length();
-            insertSql.delete(len - 1, len).append(")");
-
-            Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
-
-            List<Map<String, ?>> values = new ArrayList<>();
-            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                String targetColumnName = entry.getKey();
-                String srcColumnName = entry.getValue();
-                if (srcColumnName == null) {
-                    srcColumnName = Util.cleanColumn(targetColumnName);
-                }
-
-                Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+        columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+        int len = insertSql.length();
+        insertSql.delete(len - 1, len).append(") VALUES (");
+        int mapLen = columnsMap.size();
+        for (int i = 0; i < mapLen; i++) {
+            insertSql.append("?,");
+        }
+        len = insertSql.length();
+        insertSql.delete(len - 1, len).append(")");
 
-                Object value = data.get(srcColumnName);
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
-                BatchExecutor.setValue(values, type, value);
+        List<Map<String, ?>> values = new ArrayList<>();
+        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = Util.cleanColumn(targetColumnName);
             }
 
-            batchExecutor.execute(insertSql.toString(), values);
-            if (logger.isTraceEnabled()) {
-                logger.trace("Insert into target table, sql: {}", insertSql);
+            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+            if (type == null) {
+                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
             }
+            Object value = data.get(srcColumnName);
 
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+            BatchExecutor.setValue(values, type, value);
         }
+
+        batchExecutor.execute(insertSql.toString(), values);
+        if (logger.isTraceEnabled()) {
+            logger.trace("Insert into target table, sql: {}", insertSql);
+        }
+
     }
 
     /**
@@ -276,43 +265,42 @@ public class RdbSyncService {
 
         DbMapping dbMapping = config.getDbMapping();
 
-        try {
-            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
-
-            Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
-
-            StringBuilder updateSql = new StringBuilder();
-            updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
-            List<Map<String, ?>> values = new ArrayList<>();
-            for (String srcColumnName : old.keySet()) {
-                List<String> targetColumnNames = new ArrayList<>();
-                columnsMap.forEach((targetColumn, srcColumn) -> {
-                    if (srcColumnName.toLowerCase().equals(srcColumn)) {
-                        targetColumnNames.add(targetColumn);
-                    }
-                });
-                if (!targetColumnNames.isEmpty()) {
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
-                    for (String targetColumnName : targetColumnNames) {
-                        updateSql.append(targetColumnName).append("=?, ");
-                        Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
-                        BatchExecutor.setValue(values, type, data.get(srcColumnName));
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+        StringBuilder updateSql = new StringBuilder();
+        updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
+        List<Map<String, ?>> values = new ArrayList<>();
+        for (String srcColumnName : old.keySet()) {
+            List<String> targetColumnNames = new ArrayList<>();
+            columnsMap.forEach((targetColumn, srcColumn) -> {
+                if (srcColumnName.toLowerCase().equals(srcColumn)) {
+                    targetColumnNames.add(targetColumn);
+                }
+            });
+            if (!targetColumnNames.isEmpty()) {
+
+                for (String targetColumnName : targetColumnNames) {
+                    updateSql.append(targetColumnName).append("=?, ");
+                    Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+                    if (type == null) {
+                        throw new RuntimeException("Target column: " + targetColumnName + " not matched");
                     }
+                    BatchExecutor.setValue(values, type, data.get(srcColumnName));
                 }
             }
-            int len = updateSql.length();
-            updateSql.delete(len - 2, len).append(" WHERE ");
+        }
+        int len = updateSql.length();
+        updateSql.delete(len - 2, len).append(" WHERE ");
 
-            // 拼接主键
-            appendCondition(dbMapping, updateSql, ctype, values, data, old);
+        // 拼接主键
+        appendCondition(dbMapping, updateSql, ctype, values, data, old);
 
-            batchExecutor.execute(updateSql.toString(), values);
+        batchExecutor.execute(updateSql.toString(), values);
 
-            if (logger.isTraceEnabled()) {
-                logger.trace("Update target table, sql: {}", updateSql);
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+        if (logger.isTraceEnabled()) {
+            logger.trace("Update target table, sql: {}", updateSql);
         }
     }
 
@@ -330,23 +318,19 @@ public class RdbSyncService {
 
         DbMapping dbMapping = config.getDbMapping();
 
-        try {
-            Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
-            StringBuilder sql = new StringBuilder();
-            sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
+        StringBuilder sql = new StringBuilder();
+        sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
 
-            List<Map<String, ?>> values = new ArrayList<>();
-            // 拼接主键
-            appendCondition(dbMapping, sql, ctype, values, data);
+        List<Map<String, ?>> values = new ArrayList<>();
+        // 拼接主键
+        appendCondition(dbMapping, sql, ctype, values, data);
 
-            batchExecutor.execute(sql.toString(), values);
+        batchExecutor.execute(sql.toString(), values);
 
-            if (logger.isTraceEnabled()) {
-                logger.trace("Delete from target table, sql: {}", sql);
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+        if (logger.isTraceEnabled()) {
+            logger.trace("Delete from target table, sql: {}", sql);
         }
     }
 
@@ -405,6 +389,9 @@ public class RdbSyncService {
             }
             sql.append(targetColumnName).append("=? AND ");
             Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+            if (type == null) {
+                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
+            }
             // 如果有修改主键的情况
             if (o != null && o.containsKey(srcColumnName)) {
                 BatchExecutor.setValue(values, type, o.get(srcColumnName));

+ 13 - 11
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java

@@ -16,18 +16,11 @@ public class BatchExecutor implements Closeable {
 
     private static final Logger logger = LoggerFactory.getLogger(BatchExecutor.class);
 
-    private Integer             key;
     private Connection          conn;
     private AtomicInteger       idx    = new AtomicInteger(0);
 
     public BatchExecutor(Connection conn){
-        this(1, conn);
-    }
-
-    public BatchExecutor(Integer key, Connection conn){
-        this.key = key;
         this.conn = conn;
-
         try {
             this.conn.setAutoCommit(false);
         } catch (SQLException e) {
@@ -35,9 +28,6 @@ public class BatchExecutor implements Closeable {
         }
     }
 
-    public Integer getKey() {
-        return key;
-    }
 
     public Connection getConn() {
         return conn;
@@ -70,7 +60,19 @@ public class BatchExecutor implements Closeable {
         try {
             conn.commit();
             if (logger.isTraceEnabled()) {
-                logger.trace("Batch executor: " + key + " commit " + idx.get() + " rows");
+                logger.trace("Batch executor commit " + idx.get() + " rows");
+            }
+            idx.set(0);
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public void rollback() {
+        try {
+            conn.rollback();
+            if (logger.isTraceEnabled()) {
+                logger.trace("Batch executor rollback " + idx.get() + " rows");
             }
             idx.set(0);
         } catch (SQLException e) {

+ 7 - 7
client-adapter/rdb/src/main/resources/rdb/mytest_user.yml

@@ -8,13 +8,13 @@ dbMapping:
   targetTable: mytest.tb_user
   targetPk:
     id: id
-  mapAll: true
-#  targetColumns:
-#    id:
-#    name:
-#    role_id:
-#    c_time:
-#    test1:
+#  mapAll: true
+  targetColumns:
+    id:
+    name:
+    role_id:
+    c_time:
+    test1:
 
 
 # Mirror schema synchronize config