Browse Source

rdb新增同步测试完成

mcy 6 years ago
parent
commit
f7ea8df89e

+ 24 - 13
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -56,16 +56,19 @@ public class MappingConfig {
 
     public static class DbMapping {
 
-        private String              database;                            // 数据库名或schema名
-        private String              table;                               // 表面名
-        private boolean             mapAll      = false;                 // 映射所有字段
-        private String              targetTable;                         // 目标表名
-        private Map<String, String> targetColums;                        // 目标表字段映射
-        private String              etlCondition;                        // etl条件sql
+        private String                       database;                            // 数据库名或schema名
+        private String                       table;                               // 表面名
+        private boolean                      mapAll      = false;                 // 映射所有字段
+        private String                       targetTable;                         // 目标表名
+        private Map<String, String>          targetColumns;                       // 目标表字段映射
 
-        private Set<String>         families    = new LinkedHashSet<>(); // column family列表
-        private int                 readBatch   = 5000;
-        private int                 commitBatch = 5000;                  // etl等批量提交大小
+        private String                       etlCondition;                        // etl条件sql
+
+        private Set<String>                  families    = new LinkedHashSet<>(); // column family列表
+        private int                          readBatch   = 5000;
+        private int                          commitBatch = 5000;                  // etl等批量提交大小
+
+        private volatile Map<String, String> allColumns;                          // mapAll为true,自动设置改字段
 
         public String getDatabase() {
             return database;
@@ -99,12 +102,12 @@ public class MappingConfig {
             this.targetTable = targetTable;
         }
 
-        public Map<String, String> getTargetColums() {
-            return targetColums;
+        public Map<String, String> getTargetColumns() {
+            return targetColumns;
         }
 
-        public void setTargetColums(Map<String, String> targetColums) {
-            this.targetColums = targetColums;
+        public void setTargetColumns(Map<String, String> targetColumns) {
+            this.targetColumns = targetColumns;
         }
 
         public String getEtlCondition() {
@@ -138,5 +141,13 @@ public class MappingConfig {
         public void setCommitBatch(int commitBatch) {
             this.commitBatch = commitBatch;
         }
+
+        public Map<String, String> getAllColumns() {
+            return allColumns;
+        }
+
+        public void setAllColumns(Map<String, String> allColumns) {
+            this.allColumns = allColumns;
+        }
     }
 }

+ 326 - 215
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -5,7 +5,7 @@ import java.io.StringReader;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.sql.*;
-import java.util.Date;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 
 /**
@@ -31,9 +32,12 @@ import com.alibaba.otter.canal.client.adapter.support.Dml;
  */
 public class RdbSyncService {
 
-    private static Logger logger = LoggerFactory.getLogger(RdbSyncService.class);
+    private static final Logger                            logger             = LoggerFactory
+        .getLogger(RdbSyncService.class);
 
-    private DataSource    dataSource;
+    private static final Map<String, Map<String, Integer>> COLUMNS_TYPE_CACHE = new ConcurrentHashMap<>();
+
+    private DataSource                                     dataSource;
 
     public RdbSyncService(DataSource dataSource){
         this.dataSource = dataSource;
@@ -42,6 +46,52 @@ public class RdbSyncService {
     public void sync(MappingConfig config, Dml dml) {
         try {
             if (config != null) {
+                {
+                    DbMapping dbMapping = config.getDbMapping();
+                    // 从源表加载所有字段名
+                    if (dbMapping.getAllColumns() == null) {
+                        synchronized (RdbSyncService.class) {
+                            if (dbMapping.getAllColumns() == null) {
+                                DataSource srcDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+                                Connection srcConn = srcDS.getConnection();
+                                String srcMetaSql = "SELECT * FROM " + dbMapping.getDatabase() + "."
+                                                    + dbMapping.getTable() + " WHERE 1=2 ";
+                                List<String> srcColumns = new ArrayList<>();
+                                sqlRS(srcConn, srcMetaSql, rs -> {
+                                    try {
+                                        ResultSetMetaData rmd = rs.getMetaData();
+                                        int cnt = rmd.getColumnCount();
+                                        for (int i = 1; i <= cnt; i++) {
+                                            srcColumns.add(rmd.getColumnName(i).toLowerCase());
+                                        }
+                                    } catch (SQLException e) {
+                                        logger.error(e.getMessage(), e);
+                                    }
+                                });
+                                Map<String, String> columnsMap = new LinkedHashMap<>();
+
+                                for (String srcColumn : srcColumns) {
+                                    String targetColumn = srcColumn;
+                                    if (dbMapping.getTargetColumns() != null) {
+                                        for (Map.Entry<String, String> entry : dbMapping.getTargetColumns()
+                                            .entrySet()) {
+                                            String targetColumnName = entry.getKey();
+                                            String srcColumnName = entry.getValue();
+
+                                            if (srcColumnName != null
+                                                && srcColumnName.toLowerCase().equals(srcColumn.toUpperCase())) {
+                                                targetColumn = targetColumnName;
+                                            }
+                                        }
+                                    }
+                                    columnsMap.put(targetColumn, srcColumn);
+                                }
+                                dbMapping.setAllColumns(columnsMap);
+                            }
+                        }
+                    }
+                }
+
                 String type = dml.getType();
                 if (type != null && type.equalsIgnoreCase("INSERT")) {
                     insert(config, dml);
@@ -59,6 +109,72 @@ public class RdbSyncService {
         }
     }
 
+    private void update(MappingConfig config, Dml dml) throws SQLException {
+        List<Map<String, Object>> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        List<Map<String, Object>> old = dml.getOld();
+        if (old == null || old.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+
+        int idx = 1;
+        boolean complete = false;
+
+        Connection conn = dataSource.getConnection();
+        boolean oriAutoCommit = conn.getAutoCommit();
+        conn.setAutoCommit(false);
+
+        try {
+            Map<String, String> columnsMap;
+            if (dbMapping.isMapAll()) {
+                columnsMap = dbMapping.getAllColumns();
+            } else {
+                columnsMap = dbMapping.getTargetColumns();
+            }
+
+            int i = 0;
+            for (Map<String, Object> o : old) {
+                Map<String, Object> d = data.get(i);
+                StringBuilder updateSql = new StringBuilder();
+                updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
+                List<Object> values = new ArrayList<>();
+                boolean flag = false;
+                for (String srcColumnName : o.keySet()) {
+                    List<String> targetColumnNames = new ArrayList<>();
+                    columnsMap.forEach((targetColumn, srcColumn) -> {
+                        if (srcColumnName.toLowerCase().equals(srcColumn)) {
+                            targetColumnNames.add(targetColumn);
+                        }
+                    });
+                    if (!targetColumnNames.isEmpty()) {
+                        if (!flag) {
+                            flag = true;
+                        }
+                        for (String targetColumnName : targetColumnNames) {
+                            updateSql.append(targetColumnName).append("=?, ");
+                            values.add(data.get(i));
+                        }
+                    }
+                }
+                if (flag) {
+                    int len = updateSql.length();
+                    updateSql.delete(len - 2, len).append(" WHERE ");
+                }
+                i++;
+            }
+        } catch (Exception e) {
+            conn.rollback();
+        } finally {
+            conn.setAutoCommit(oriAutoCommit);
+            conn.close();
+        }
+    }
+
     /**
      * 插入操作
      *
@@ -74,79 +190,68 @@ public class RdbSyncService {
         DbMapping dbMapping = config.getDbMapping();
 
         int idx = 1;
-        boolean complete = false;
+        boolean completed = false;
 
         Connection conn = dataSource.getConnection();
-
+        boolean oriAutoCommit = conn.getAutoCommit();
         conn.setAutoCommit(false);
+        try {
+            Map<String, String> columnsMap;
+            if (dbMapping.isMapAll()) {
+                columnsMap = dbMapping.getAllColumns();
+            } else {
+                columnsMap = dbMapping.getTargetColumns();
+            }
 
-        StringBuilder insertSql = new StringBuilder();
-        insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
-        if (!dbMapping.isMapAll()) {
-            dbMapping.getTargetColums().forEach((targetColumnName, srcColumnName) -> {
-                insertSql.append(targetColumnName).append(",");
-            });
+            StringBuilder insertSql = new StringBuilder();
+            insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+
+            columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
             int len = insertSql.length();
             insertSql.delete(len - 1, len).append(") VALUES (");
-            int mapLen = dbMapping.getTargetColums().size();
+            int mapLen = columnsMap.size();
             for (int i = 0; i < mapLen; i++) {
                 insertSql.append("?,");
             }
             len = insertSql.length();
             insertSql.delete(len - 1, len).append(")");
-        }
 
-        PreparedStatement pstmt = conn.prepareStatement(insertSql.toString());
+            PreparedStatement pstmt = conn.prepareStatement(insertSql.toString());
 
-        for (Map<String, Object> r : data) {
-            pstmt.clearParameters();
-            convertData2DbRow(conn, config, r, pstmt);
+            for (Map<String, Object> r : data) {
+                pstmt.clearParameters();
+                convertData2DbRow(conn, config, r, pstmt);
 
-            pstmt.execute();
+                pstmt.execute();
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Insert into target db, sql: {}", insertSql);
+                }
 
-            if (idx % config.getDbMapping().getCommitBatch() == 0) {
+                if (idx % config.getDbMapping().getCommitBatch() == 0) {
+                    conn.commit();
+                    completed = true;
+                }
+                idx++;
+            }
+            if (!completed) {
                 conn.commit();
-                complete = true;
             }
-            idx++;
-        }
-        if (!complete) {
-            conn.commit();
+        } catch (Exception e) {
+            conn.rollback();
+        } finally {
+            conn.setAutoCommit(oriAutoCommit);
+            conn.close();
         }
-        conn.close();
     }
 
     private static void sqlRS(Connection conn, String sql, Consumer<ResultSet> consumer) {
-        Statement stmt = null;
-        ResultSet rs = null;
-        try {
-            stmt = conn.createStatement();
-            rs = stmt.executeQuery(sql);
+        try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
             consumer.accept(rs);
         } catch (SQLException e) {
             logger.error(e.getMessage(), e);
-        } finally {
-            if (rs != null) {
-                try {
-                    rs.close();
-                    rs = null;
-                } catch (SQLException e) {
-                    // ignore
-                }
-            }
-            if (stmt != null) {
-                try {
-                    stmt.close();
-                    stmt = null;
-                } catch (SQLException e) {
-                    // ignore
-                }
-            }
         }
     }
 
-    private static Map<String, Map<String, Integer>> COLUMNS_TYPE_CACHE = new ConcurrentHashMap<>();
-
     private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
         DbMapping dbMapping = config.getDbMapping();
         String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
@@ -187,182 +292,188 @@ public class RdbSyncService {
     private void convertData2DbRow(Connection conn, MappingConfig config, Map<String, Object> data,
                                    PreparedStatement pstmt) throws SQLException {
         DbMapping dbMapping = config.getDbMapping();
+        Map<String, String> columnsMap;
         if (dbMapping.isMapAll()) {
-
+            columnsMap = dbMapping.getAllColumns();
         } else {
-            Map<String, Integer> ctype = getTargetColumnType(conn, config);
-
-            int i = 1;
-            for (Map.Entry<String, String> entry : dbMapping.getTargetColums().entrySet()) {
-                String targetClassName = entry.getKey();
-                String srcColumnName = entry.getValue();
-                if (srcColumnName == null) {
-                    srcColumnName = targetClassName;
+            columnsMap = dbMapping.getTargetColumns();
+        }
+        Map<String, Integer> ctype = getTargetColumnType(conn, config);
+
+        int i = 1;
+        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+            String targetClassName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = targetClassName;
+            }
+
+            Integer type = ctype.get(targetClassName.toLowerCase());
+
+            Object value = data.get(srcColumnName);
+            if (value != null) {
+                if (type == null) {
+                    throw new RuntimeException("No column: " + targetClassName + " found in target db");
                 }
 
-                Integer type = ctype.get(targetClassName.toLowerCase());
+                setPStmt(type, pstmt, value, i);
+            } else {
+                pstmt.setNull(i, type);
+            }
+            i++;
+        }
+    }
+
+    private void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
+        switch (type) {
+            case Types.BIT:
+            case Types.BOOLEAN:
+                if (value instanceof Boolean) {
+                    pstmt.setBoolean(i, (Boolean) value);
+                } else if (value instanceof String) {
+                    boolean v = !value.equals("0");
+                    pstmt.setBoolean(i, v);
+                } else if (value instanceof Number) {
+                    boolean v = ((Number) value).intValue() != 0;
+                    pstmt.setBoolean(i, v);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.CHAR:
+            case Types.NCHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+                pstmt.setString(i, value.toString());
+                break;
+            case Types.TINYINT:
+                if (value instanceof Number) {
+                    pstmt.setByte(i, ((Number) value).byteValue());
+                } else if (value instanceof String) {
+                    pstmt.setByte(i, Byte.parseByte((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.SMALLINT:
+                if (value instanceof Number) {
+                    pstmt.setShort(i, ((Number) value).shortValue());
+                } else if (value instanceof String) {
+                    pstmt.setShort(i, Short.parseShort((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.INTEGER:
+                if (value instanceof Number) {
+                    pstmt.setInt(i, ((Number) value).intValue());
+                } else if (value instanceof String) {
+                    pstmt.setInt(i, Integer.parseInt((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.BIGINT:
+                if (value instanceof Number) {
+                    pstmt.setLong(i, ((Number) value).longValue());
+                } else if (value instanceof String) {
+                    pstmt.setLong(i, Long.parseLong((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.DECIMAL:
+            case Types.NUMERIC:
+                pstmt.setBigDecimal(i, new BigDecimal(value.toString()));
+                break;
+            case Types.REAL:
+                if (value instanceof Number) {
+                    pstmt.setFloat(i, ((Number) value).floatValue());
+                } else if (value instanceof String) {
+                    pstmt.setFloat(i, Float.parseFloat((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.FLOAT:
+            case Types.DOUBLE:
+                if (value instanceof Number) {
+                    pstmt.setDouble(i, ((Number) value).doubleValue());
+                } else if (value instanceof String) {
+                    pstmt.setDouble(i, Double.parseDouble((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            case Types.BLOB:
 
-                Object value = data.get(srcColumnName);
-                if (value != null) {
-                    if (type == null) {
-                        throw new RuntimeException("No column: " + targetClassName + " found in target db");
+                if (value instanceof byte[]) {
+                    pstmt.setBytes(i, (byte[]) value);
+                } else if (value instanceof String) {
+                    pstmt.setBytes(i, ((String) value).getBytes(StandardCharsets.ISO_8859_1));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.CLOB:
+                if (value instanceof byte[]) {
+                    pstmt.setBytes(i, (byte[]) value);
+                } else if (value instanceof String) {
+                    Reader clobReader = new StringReader((String) value);
+                    pstmt.setCharacterStream(i, clobReader);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.DATE:
+                if (value instanceof java.util.Date) {
+                    pstmt.setDate(i, new java.sql.Date(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    if (!v.startsWith("0000-00-00")) {
+                        v = v.trim().replace(" ", "T");
+                        DateTime dt = new DateTime(v);
+                        pstmt.setDate(i, new Date(dt.toDate().getTime()));
+                    } else {
+                        pstmt.setNull(i, type);
                     }
-                    switch (type) {
-                        case Types.BIT:
-                        case Types.BOOLEAN:
-                            if (value instanceof Boolean) {
-                                pstmt.setBoolean(i, (Boolean) value);
-                            } else if (value instanceof String) {
-                                boolean v = !value.equals("0");
-                                pstmt.setBoolean(i, v);
-                            } else if (value instanceof Number) {
-                                boolean v = ((Number) value).intValue() != 0;
-                                pstmt.setBoolean(i, v);
-                            } else {
-                                pstmt.setNull(i, type);
-                            }
-                            break;
-                        case Types.CHAR:
-                        case Types.NCHAR:
-                        case Types.VARCHAR:
-                        case Types.LONGVARCHAR:
-                            pstmt.setString(i, value.toString());
-                            break;
-                        case Types.TINYINT:
-                            if (value instanceof Number) {
-                                pstmt.setByte(i, ((Number) value).byteValue());
-                            } else if (value instanceof String) {
-                                pstmt.setByte(i, Byte.parseByte((String) value));
-                            } else {
-                                pstmt.setNull(i, type);
-                            }
-                            break;
-                        case Types.SMALLINT:
-                            if (value instanceof Number) {
-                                pstmt.setShort(i, ((Number) value).shortValue());
-                            } else if (value instanceof String) {
-                                pstmt.setShort(i, Short.parseShort((String) value));
-                            } else {
-                                pstmt.setNull(i, type);
-                            }
-                            break;
-                        case Types.INTEGER:
-                            if (value instanceof Number) {
-                                pstmt.setInt(i, ((Number) value).intValue());
-                            } else if (value instanceof String) {
-                                pstmt.setInt(i, Integer.parseInt((String) value));
-                            } else {
-                                pstmt.setNull(i, type);
-                            }
-                            break;
-                        case Types.BIGINT:
-                            if (value instanceof Number) {
-                                pstmt.setLong(i, ((Number) value).longValue());
-                            } else if (value instanceof String) {
-                                pstmt.setLong(i, Long.parseLong((String) value));
-                            } else {
-                                pstmt.setNull(i, type);
-                            }
-                            break;
-                        case Types.DECIMAL:
-                        case Types.NUMERIC:
-                            pstmt.setBigDecimal(i, new BigDecimal(value.toString()));
-                            break;
-                        case Types.REAL:
-                            if (value instanceof Number) {
-                                pstmt.setFloat(i, ((Number) value).floatValue());
-                            } else if (value instanceof String) {
-                                pstmt.setFloat(i, Float.parseFloat((String) value));
-                            } else {
-                                pstmt.setNull(i, type);
-                            }
-                            break;
-                        case Types.FLOAT:
-                        case Types.DOUBLE:
-                            if (value instanceof Number) {
-                                pstmt.setDouble(i, ((Number) value).doubleValue());
-                            } else if (value instanceof String) {
-                                pstmt.setDouble(i, Double.parseDouble((String) value));
-                            } else {
-                                pstmt.setNull(i, type);
-                            }
-                            break;
-                        case Types.BINARY:
-                        case Types.VARBINARY:
-                        case Types.LONGVARBINARY:
-                        case Types.BLOB:
-
-                            if (value instanceof byte[]) {
-                                pstmt.setBytes(i, (byte[]) value);
-                            } else if (value instanceof String) {
-                                pstmt.setBytes(i, ((String) value).getBytes(StandardCharsets.ISO_8859_1));
-                            } else {
-                                pstmt.setNull(i, type);
-                            }
-                            break;
-                        case Types.CLOB:
-                            if (value instanceof byte[]) {
-                                pstmt.setBytes(i, (byte[]) value);
-                            } else if (value instanceof String) {
-                                Reader clobReader = new StringReader((String) value);
-                                pstmt.setCharacterStream(i, clobReader);
-                            } else {
-                                pstmt.setNull(i, type);
-                            }
-                            break;
-                        case Types.DATE:
-                            if (value instanceof Date) {
-                                pstmt.setDate(i, new java.sql.Date(((Date) value).getTime()));
-                            } else if (value instanceof String) {
-                                String v = (String) value;
-                                if (!v.startsWith("0000-00-00")) {
-                                    v = v.trim().replace(" ", "T");
-                                    DateTime dt = new DateTime(v);
-                                    pstmt.setDate(i, new java.sql.Date(dt.toDate().getTime()));
-                                } else {
-                                    pstmt.setNull(i, type);
-                                }
-                            } else {
-                                pstmt.setNull(i, type);
-                            }
-                            break;
-                        case Types.TIME:
-                            if (value instanceof Date) {
-                                pstmt.setTime(i, new java.sql.Time(((Date) value).getTime()));
-                            } else if (value instanceof String) {
-                                String v = (String) value;
-                                v = "T" + v;
-                                DateTime dt = new DateTime(v);
-                                pstmt.setTime(i, new Time(dt.toDate().getTime()));
-                            } else {
-                                pstmt.setNull(i, type);
-                            }
-                            break;
-                        case Types.TIMESTAMP:
-                            if (value instanceof Date) {
-                                pstmt.setTimestamp(i, new java.sql.Timestamp(((Date) value).getTime()));
-                            } else if (value instanceof String) {
-                                String v = (String) value;
-                                if (!v.startsWith("0000-00-00")) {
-                                    v = v.trim().replace(" ", "T");
-                                    DateTime dt = new DateTime(v);
-                                    pstmt.setTimestamp(i, new Timestamp(dt.toDate().getTime()));
-                                } else {
-                                    pstmt.setNull(i, type);
-                                }
-                            } else {
-                                pstmt.setNull(i, type);
-                            }
-                            break;
-                        default:
-                            pstmt.setObject(i, value, type);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.TIME:
+                if (value instanceof java.util.Date) {
+                    pstmt.setTime(i, new java.sql.Time(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    v = "T" + v;
+                    DateTime dt = new DateTime(v);
+                    pstmt.setTime(i, new Time(dt.toDate().getTime()));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.TIMESTAMP:
+                if (value instanceof java.util.Date) {
+                    pstmt.setTimestamp(i, new java.sql.Timestamp(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    if (!v.startsWith("0000-00-00")) {
+                        v = v.trim().replace(" ", "T");
+                        DateTime dt = new DateTime(v);
+                        pstmt.setTimestamp(i, new Timestamp(dt.toDate().getTime()));
+                    } else {
+                        pstmt.setNull(i, type);
                     }
-
                 } else {
                     pstmt.setNull(i, type);
                 }
-                i++;
-            }
+                break;
+            default:
+                pstmt.setObject(i, value, type);
         }
     }
 }

+ 1 - 1
client-adapter/rdb/src/main/resources/rdb/mytest_user.yml

@@ -6,7 +6,7 @@ dbMapping:
   targetTable: mytest.tb_user
   commitBatch: 3000
   mapAll: false
-  columns:
+  targetColumns:
     id:
     name:
     role_id:

+ 7 - 7
client-adapter/rdb/src/test/resources/rdb/mytest_user.yml

@@ -5,10 +5,10 @@ dbMapping:
   table: user
   targetTable: mytest.tb_user
   commitBatch: 3000
-  mapAll: false
-  targetColums:
-    id:
-    name:
-    role_id:
-    c_time:
-    test1:
+  mapAll: true
+#  targetColumns:
+#    id:
+#    name:
+#    role_id:
+#    c_time:
+#    test1:

+ 2 - 2
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -104,7 +104,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                 canalDestination.getPartition(),
                                 null,
-                                JSON.toJSONString(flatMessage));
+                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                             producer2.send(record).get();
                         } catch (Exception e) {
                             logger.error(e.getMessage(), e);
@@ -126,7 +126,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                         ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                             i,
                                             null,
-                                            JSON.toJSONString(flatMessagePart));
+                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
                                         producer2.send(record).get();
                                     } catch (Exception e) {
                                         logger.error(e.getMessage(), e);

+ 5 - 4
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.rocketmq;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.CanalMessageSerializer;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -75,11 +76,11 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                     if (destination.getPartition() != null) {
                         try {
                             logger.info("send flat message: {} to topic: {} fixed partition: {}",
-                                JSON.toJSONString(flatMessage),
+                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
                                 destination.getTopic(),
                                 destination.getPartition());
                             Message message = new Message(destination.getTopic(),
-                                JSON.toJSONString(flatMessage).getBytes());
+                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                 @Override
@@ -102,12 +103,12 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
                                     logger.debug("flatMessagePart: {}, partition: {}",
-                                        JSON.toJSONString(flatMessagePart),
+                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
                                         i);
                                     final int index = i;
                                     try {
                                         Message message = new Message(destination.getTopic(),
-                                            JSON.toJSONString(flatMessagePart).getBytes());
+                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue).getBytes());
                                         this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                             @Override