Browse Source

Merge pull request #29 from alibaba/master

merge
rewerma 6 years ago
parent
commit
5e116de264
16 changed files with 288 additions and 171 deletions
  1. 1 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  2. 9 1
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/HbaseTemplate.java
  3. 4 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  4. 17 11
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  5. 85 82
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java
  6. 30 28
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java
  7. 41 36
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  8. 78 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/CharsetConversion.java
  9. 1 1
      deployer/src/main/resources/canal.properties
  10. 1 1
      deployer/src/main/resources/example/instance.properties
  11. 4 3
      example/src/main/bin/startup.bat
  12. 1 1
      instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java
  13. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java
  14. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java
  15. 9 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  16. 5 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

+ 1 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -100,7 +100,7 @@ public class ESSyncService {
                     dml.getDestination(),
                     config.getEsMapping().get_index());
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);
             throw new RuntimeException(e);
         }

+ 9 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/HbaseTemplate.java

@@ -9,7 +9,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,6 +118,7 @@ public class HbaseTemplate {
             flag = true;
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
         return flag;
 
@@ -145,6 +151,7 @@ public class HbaseTemplate {
             flag = true;
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
         return flag;
     }
@@ -171,6 +178,7 @@ public class HbaseTemplate {
             flag = true;
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
         return flag;
     }

+ 4 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -2,7 +2,10 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

+ 17 - 11
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -26,7 +26,12 @@ import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbMirrorDbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
-import com.alibaba.otter.canal.client.adapter.support.*;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
  * RDB适配器实现类
@@ -73,8 +78,8 @@ public class RdbAdapter implements OuterAdapter {
         // 过滤不匹配的key的配置
         rdbMappingTmp.forEach((key, mappingConfig) -> {
             if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
-                || (mappingConfig.getOuterAdapterKey() != null
-                    && mappingConfig.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey()))) {
+                || (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
+                    .equalsIgnoreCase(configuration.getKey()))) {
                 rdbMapping.put(key, mappingConfig);
             }
         });
@@ -87,17 +92,17 @@ public class RdbAdapter implements OuterAdapter {
             String configName = entry.getKey();
             MappingConfig mappingConfig = entry.getValue();
             if (!mappingConfig.getDbMapping().getMirrorDb()) {
-                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(
-                    StringUtils.trimToEmpty(mappingConfig.getDestination()) + "." + mappingConfig.getDbMapping()
-                        .getDatabase() + "." + mappingConfig.getDbMapping().getTable(),
+                String key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                             + mappingConfig.getDbMapping().getDatabase() + "."
+                             + mappingConfig.getDbMapping().getTable();
+                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
                     k1 -> new ConcurrentHashMap<>());
                 configMap.put(configName, mappingConfig);
             } else {
                 // mirrorDB
-
-                mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                        + mappingConfig.getDbMapping().getDatabase(),
-                    MirrorDbConfig.create(configName, mappingConfig));
+                String key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                             + mappingConfig.getDbMapping().getDatabase();
+                mirrorDbConfigCache.put(key, MirrorDbConfig.create(configName, mappingConfig));
             }
         }
 
@@ -110,10 +115,11 @@ public class RdbAdapter implements OuterAdapter {
         dataSource.setPassword(properties.get("jdbc.password"));
         dataSource.setInitialSize(1);
         dataSource.setMinIdle(1);
-        dataSource.setMaxActive(10);
+        dataSource.setMaxActive(30);
         dataSource.setMaxWait(60000);
         dataSource.setTimeBetweenEvictionRunsMillis(60000);
         dataSource.setMinEvictableIdleTimeMillis(300000);
+        dataSource.setUseUnfairLock(true);
 
         try {
             dataSource.init();

+ 85 - 82
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -36,8 +36,7 @@ public class RdbEtlService {
     /**
      * 导入数据
      */
-    public static EtlResult importData(DataSource srcDS, DataSource targetDS, MappingConfig config,
-                                       List<String> params) {
+    public static EtlResult importData(DataSource srcDS, DataSource targetDS, MappingConfig config, List<String> params) {
         EtlResult etlResult = new EtlResult();
         AtomicLong successCount = new AtomicLong();
         List<String> errMsg = new ArrayList<>();
@@ -54,8 +53,8 @@ public class RdbEtlService {
             long start = System.currentTimeMillis();
 
             // 拼接sql
-            StringBuilder sql = new StringBuilder(
-                "SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable());
+            StringBuilder sql = new StringBuilder("SELECT * FROM " + dbMapping.getDatabase() + "."
+                                                  + dbMapping.getTable());
 
             // 拼接条件
             appendCondition(params, dbMapping, srcDS, sql);
@@ -92,8 +91,12 @@ public class RdbEtlService {
                     } else {
                         sqlFinal = sql + " LIMIT " + offset + "," + cnt;
                     }
-                    Future<Boolean> future = executor
-                        .submit(() -> executeSqlImport(srcDS, targetDS, sqlFinal, dbMapping, successCount, errMsg));
+                    Future<Boolean> future = executor.submit(() -> executeSqlImport(srcDS,
+                        targetDS,
+                        sqlFinal,
+                        dbMapping,
+                        successCount,
+                        errMsg));
                     futures.add(future);
                 }
 
@@ -106,10 +109,11 @@ public class RdbEtlService {
                 executeSqlImport(srcDS, targetDS, sql.toString(), dbMapping, successCount, errMsg);
             }
 
-            logger.info(
-                dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
+            logger.info(dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000
+                        + "s!");
 
-            etlResult.setResultMessage("导入目标表 " + SyncUtil.getDbTableName(dbMapping) + " 数据:" + successCount.get() + " 条");
+            etlResult.setResultMessage("导入目标表 " + SyncUtil.getDbTableName(dbMapping) + " 数据:" + successCount.get()
+                                       + " 条");
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
@@ -123,8 +127,8 @@ public class RdbEtlService {
         return etlResult;
     }
 
-    private static void appendCondition(List<String> params, DbMapping dbMapping, DataSource ds,
-                                        StringBuilder sql) throws SQLException {
+    private static void appendCondition(List<String> params, DbMapping dbMapping, DataSource ds, StringBuilder sql)
+                                                                                                                   throws SQLException {
         if (params != null && params.size() == 1 && dbMapping.getEtlCondition() == null) {
             AtomicBoolean stExists = new AtomicBoolean(false);
             // 验证是否有SYS_TIME字段
@@ -141,9 +145,9 @@ public class RdbEtlService {
                     }
                 } catch (Exception e) {
                     // ignore
-                }
-                return null;
-            });
+            }
+            return null;
+        }   );
             if (stExists.get()) {
                 sql.append(" WHERE SYS_TIME >= '").append(params.get(0)).append("' ");
             }
@@ -186,85 +190,84 @@ public class RdbEtlService {
                     // columnsMap = dbMapping.getTargetColumns();
                     // }
 
-                    StringBuilder insertSql = new StringBuilder();
-                    insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
-                    columnsMap
-                        .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+                StringBuilder insertSql = new StringBuilder();
+                insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
+                columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
 
-                    int len = insertSql.length();
-                    insertSql.delete(len - 1, len).append(") VALUES (");
-                    int mapLen = columnsMap.size();
-                    for (int i = 0; i < mapLen; i++) {
-                        insertSql.append("?,");
-                    }
-                    len = insertSql.length();
-                    insertSql.delete(len - 1, len).append(")");
-                    try (Connection connTarget = targetDS.getConnection();
-                            PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
-                        connTarget.setAutoCommit(false);
-
-                        while (rs.next()) {
-                            pstmt.clearParameters();
-
-                            // 删除数据
-                            Map<String, Object> values = new LinkedHashMap<>();
-                            StringBuilder deleteSql = new StringBuilder(
-                                "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
-                            appendCondition(dbMapping, deleteSql, values, rs);
-                            try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
-                                int k = 1;
-                                for (Object val : values.values()) {
-                                    pstmt2.setObject(k++, val);
-                                }
-                                pstmt2.execute();
+                int len = insertSql.length();
+                insertSql.delete(len - 1, len).append(") VALUES (");
+                int mapLen = columnsMap.size();
+                for (int i = 0; i < mapLen; i++) {
+                    insertSql.append("?,");
+                }
+                len = insertSql.length();
+                insertSql.delete(len - 1, len).append(")");
+                try (Connection connTarget = targetDS.getConnection();
+                        PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
+                    connTarget.setAutoCommit(false);
+
+                    while (rs.next()) {
+                        pstmt.clearParameters();
+
+                        // 删除数据
+                        Map<String, Object> values = new LinkedHashMap<>();
+                        StringBuilder deleteSql = new StringBuilder("DELETE FROM " + SyncUtil.getDbTableName(dbMapping)
+                                                                    + " WHERE ");
+                        appendCondition(dbMapping, deleteSql, values, rs);
+                        try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
+                            int k = 1;
+                            for (Object val : values.values()) {
+                                pstmt2.setObject(k++, val);
                             }
+                            pstmt2.execute();
+                        }
 
-                            int i = 1;
-                            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                                String targetClolumnName = entry.getKey();
-                                String srcColumnName = entry.getValue();
-                                if (srcColumnName == null) {
-                                    srcColumnName = targetClolumnName;
-                                }
-
-                                Integer type = columnType.get(targetClolumnName.toLowerCase());
+                        int i = 1;
+                        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                            String targetClolumnName = entry.getKey();
+                            String srcColumnName = entry.getValue();
+                            if (srcColumnName == null) {
+                                srcColumnName = targetClolumnName;
+                            }
 
-                                Object value = rs.getObject(srcColumnName);
-                                if (value != null) {
-                                    SyncUtil.setPStmt(type, pstmt, value, i);
-                                } else {
-                                    pstmt.setNull(i, type);
-                                }
+                            Integer type = columnType.get(targetClolumnName.toLowerCase());
 
-                                i++;
+                            Object value = rs.getObject(srcColumnName);
+                            if (value != null) {
+                                SyncUtil.setPStmt(type, pstmt, value, i);
+                            } else {
+                                pstmt.setNull(i, type);
                             }
 
-                            pstmt.execute();
-                            if (logger.isTraceEnabled()) {
-                                logger.trace("Insert into target table, sql: {}", insertSql);
-                            }
+                            i++;
+                        }
 
-                            if (idx % dbMapping.getCommitBatch() == 0) {
-                                connTarget.commit();
-                                completed = true;
-                            }
-                            idx++;
-                            successCount.incrementAndGet();
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("successful import count:" + successCount.get());
-                            }
+                        pstmt.execute();
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Insert into target table, sql: {}", insertSql);
                         }
-                        if (!completed) {
+
+                        if (idx % dbMapping.getCommitBatch() == 0) {
                             connTarget.commit();
+                            completed = true;
+                        }
+                        idx++;
+                        successCount.incrementAndGet();
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("successful import count:" + successCount.get());
                         }
                     }
-
-                } catch (Exception e) {
-                    logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
-                    errMsg.add(dbMapping.getTable() + " etl failed! ==>" + e.getMessage());
+                    if (!completed) {
+                        connTarget.commit();
+                    }
                 }
-                return idx;
-            });
+
+            } catch (Exception e) {
+                logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
+                errMsg.add(dbMapping.getTable() + " etl failed! ==>" + e.getMessage());
+            }
+            return idx;
+        }   );
             return true;
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
@@ -275,8 +278,8 @@ public class RdbEtlService {
     /**
      * 拼接目标表主键where条件
      */
-    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values,
-                                        ResultSet rs) throws SQLException {
+    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values, ResultSet rs)
+                                                                                                                         throws SQLException {
         // 拼接主键
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
             String targetColumnName = entry.getKey();

+ 30 - 28
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -70,35 +70,37 @@ public class RdbMirrorDbSyncService {
             }
         }
         if (!dmlList.isEmpty()) {
-            rdbSyncService.sync(dmlList, dml -> {
-                MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(dml.getDestination() + "." + dml.getDatabase());
-                if (mirrorDbConfig == null) {
-                    return false;
-                }
-                String table = dml.getTable();
-                MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
+            rdbSyncService.sync(dmlList,
+                dml -> {
+                    MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(dml.getDestination() + "."
+                                                                            + dml.getDatabase());
+                    if (mirrorDbConfig == null) {
+                        return false;
+                    }
+                    String table = dml.getTable();
+                    MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
 
-                if (config == null) {
-                    return false;
-                }
+                    if (config == null) {
+                        return false;
+                    }
 
-                if (config.getConcurrent()) {
-                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                    singleDmls.forEach(singleDml -> {
-                        int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
-                        RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
-                        rdbSyncService.getDmlsPartition()[hash].add(syncItem);
-                    });
-                } else {
-                    int hash = 0;
-                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                    singleDmls.forEach(singleDml -> {
-                        RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
-                        rdbSyncService.getDmlsPartition()[hash].add(syncItem);
-                    });
-                }
-                return true;
-            });
+                    if (config.getConcurrent()) {
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
+                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                        });
+                    } else {
+                        int hash = 0;
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                        });
+                    }
+                    return true;
+                });
         }
     }
 
@@ -148,7 +150,7 @@ public class RdbMirrorDbSyncService {
                 logger.trace("Execute DDL sql: {} for database: {}", ddl.getSql(), ddl.getDatabase());
             }
         } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
     }
 }

+ 41 - 36
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -7,7 +7,11 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.function.Function;
 
 import javax.sql.DataSource;
@@ -74,7 +78,7 @@ public class RdbSyncService {
                 executorThreads[i] = Executors.newSingleThreadExecutor();
             }
         } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
     }
 
@@ -99,12 +103,13 @@ public class RdbSyncService {
                 int j = i;
                 futures.add(executorThreads[i].submit(() -> {
                     try {
-                        dmlsPartition[j]
-                            .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
+                        dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
+                            syncItem.config,
+                            syncItem.singleDml));
                         dmlsPartition[j].clear();
                         batchExecutors[j].commit();
                         return true;
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         batchExecutors[j].rollback();
                         throw new RuntimeException(e);
                     }
@@ -131,41 +136,41 @@ public class RdbSyncService {
         sync(dmls, dml -> {
             if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
                 // DDL
-                columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+            columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+            return false;
+        } else {
+            // DML
+            String destination = StringUtils.trimToEmpty(dml.getDestination());
+            String database = dml.getDatabase();
+            String table = dml.getTable();
+            Map<String, MappingConfig> configMap = mappingConfig.get(destination + "." + database + "." + table);
+
+            if (configMap == null) {
                 return false;
-            } else {
-                // DML
-                String destination = StringUtils.trimToEmpty(dml.getDestination());
-                String database = dml.getDatabase();
-                String table = dml.getTable();
-                Map<String, MappingConfig> configMap = mappingConfig.get(destination + "." + database + "." + table);
-
-                if (configMap == null) {
-                    return false;
-                }
+            }
 
-                boolean executed = false;
-                for (MappingConfig config : configMap.values()) {
-                    if (config.getConcurrent()) {
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            int hash = pkHash(config.getDbMapping(), singleDml.getData());
-                            SyncItem syncItem = new SyncItem(config, singleDml);
-                            dmlsPartition[hash].add(syncItem);
-                        });
-                    } else {
-                        int hash = 0;
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            SyncItem syncItem = new SyncItem(config, singleDml);
-                            dmlsPartition[hash].add(syncItem);
-                        });
-                    }
-                    executed = true;
+            boolean executed = false;
+            for (MappingConfig config : configMap.values()) {
+                if (config.getConcurrent()) {
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                    singleDmls.forEach(singleDml -> {
+                        int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                        SyncItem syncItem = new SyncItem(config, singleDml);
+                        dmlsPartition[hash].add(syncItem);
+                    });
+                } else {
+                    int hash = 0;
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                    singleDmls.forEach(singleDml -> {
+                        SyncItem syncItem = new SyncItem(config, singleDml);
+                        dmlsPartition[hash].add(syncItem);
+                    });
                 }
-                return executed;
+                executed = true;
             }
-        });
+            return executed;
+        }
+    }   );
     }
 
     /**

+ 78 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/CharsetConversion.java

@@ -260,8 +260,86 @@ public final class CharsetConversion {
         putEntry(241, "utf8mb4", "utf8mb4_esperanto_ci", "UTF-8");
         putEntry(242, "utf8mb4", "utf8mb4_hungarian_ci", "UTF-8");
         putEntry(243, "utf8mb4", "utf8mb4_sinhala_ci", "UTF-8");
+        putEntry(244, "utf8mb4", "utf8mb4_german2_ci", "UTF-8");
+        putEntry(245, "utf8mb4", "utf8mb4_croatian_ci", "UTF-8");
+        putEntry(246, "utf8mb4", "utf8mb4_unicode_520_ci", "UTF-8");
+        putEntry(247, "utf8mb4", "utf8mb4_vietnamese_ci", "UTF-8");
+        putEntry(248, "gb18030", "gb18030_chinese_ci", "GB18030");
+        putEntry(249, "gb18030", "gb18030_bin", "GB18030");
+        putEntry(250, "gb18030", "gb18030_unicode_520_ci", "GB18030");
 
         putEntry(254, "utf8", "utf8_general_cs", "UTF-8");
+        putEntry(255, "utf8mb4", "utf8mb4_0900_ai_ci", "UTF-8");
+        putEntry(256, "utf8mb4", "utf8mb4_de_pb_0900_ai_ci", "UTF-8");
+        putEntry(257, "utf8mb4", "utf8mb4_is_0900_ai_ci", "UTF-8");
+        putEntry(258, "utf8mb4", "utf8mb4_lv_0900_ai_ci", "UTF-8");
+        putEntry(259, "utf8mb4", "utf8mb4_ro_0900_ai_ci", "UTF-8");
+        putEntry(260, "utf8mb4", "utf8mb4_sl_0900_ai_ci", "UTF-8");
+        putEntry(261, "utf8mb4", "utf8mb4_pl_0900_ai_ci", "UTF-8");
+        putEntry(262, "utf8mb4", "utf8mb4_et_0900_ai_ci", "UTF-8");
+        putEntry(263, "utf8mb4", "utf8mb4_es_0900_ai_ci", "UTF-8");
+        putEntry(264, "utf8mb4", "utf8mb4_sv_0900_ai_ci", "UTF-8");
+        putEntry(265, "utf8mb4", "utf8mb4_tr_0900_ai_ci", "UTF-8");
+        putEntry(266, "utf8mb4", "utf8mb4_cs_0900_ai_ci", "UTF-8");
+        putEntry(267, "utf8mb4", "utf8mb4_da_0900_ai_ci", "UTF-8");
+        putEntry(268, "utf8mb4", "utf8mb4_lt_0900_ai_ci", "UTF-8");
+        putEntry(269, "utf8mb4", "utf8mb4_sk_0900_ai_ci", "UTF-8");
+        putEntry(270, "utf8mb4", "utf8mb4_es_trad_0900_ai_ci", "UTF-8");
+        putEntry(271, "utf8mb4", "utf8mb4_la_0900_ai_ci", "UTF-8");
+
+        putEntry(273, "utf8mb4", "utf8mb4_eo_0900_ai_ci", "UTF-8");
+        putEntry(274, "utf8mb4", "utf8mb4_hu_0900_ai_ci", "UTF-8");
+        putEntry(275, "utf8mb4", "utf8mb4_hr_0900_ai_ci", "UTF-8");
+
+        putEntry(277, "utf8mb4", "utf8mb4_vi_0900_ai_ci", "UTF-8");
+        putEntry(278, "utf8mb4", "utf8mb4_0900_as_cs", "UTF-8");
+        putEntry(279, "utf8mb4", "utf8mb4_de_pb_0900_as_cs", "UTF-8");
+        putEntry(280, "utf8mb4", "utf8mb4_is_0900_as_cs", "UTF-8");
+        putEntry(281, "utf8mb4", "utf8mb4_lv_0900_as_cs", "UTF-8");
+        putEntry(282, "utf8mb4", "utf8mb4_ro_0900_as_cs", "UTF-8");
+        putEntry(283, "utf8mb4", "utf8mb4_sl_0900_as_cs", "UTF-8");
+        putEntry(284, "utf8mb4", "utf8mb4_pl_0900_as_cs", "UTF-8");
+        putEntry(285, "utf8mb4", "utf8mb4_et_0900_as_cs", "UTF-8");
+        putEntry(286, "utf8mb4", "utf8mb4_es_0900_as_cs", "UTF-8");
+        putEntry(287, "utf8mb4", "utf8mb4_sv_0900_as_cs", "UTF-8");
+        putEntry(288, "utf8mb4", "utf8mb4_tr_0900_as_cs", "UTF-8");
+        putEntry(289, "utf8mb4", "utf8mb4_cs_0900_as_cs", "UTF-8");
+        putEntry(290, "utf8mb4", "utf8mb4_da_0900_as_cs", "UTF-8");
+        putEntry(291, "utf8mb4", "utf8mb4_lt_0900_as_cs", "UTF-8");
+        putEntry(292, "utf8mb4", "utf8mb4_sk_0900_as_cs", "UTF-8");
+        putEntry(293, "utf8mb4", "utf8mb4_es_trad_0900_as_cs", "UTF-8");
+        putEntry(294, "utf8mb4", "utf8mb4_la_0900_as_cs", "UTF-8");
+
+        putEntry(296, "utf8mb4", "utf8mb4_eo_0900_as_cs", "UTF-8");
+        putEntry(297, "utf8mb4", "utf8mb4_hu_0900_as_cs", "UTF-8");
+        putEntry(298, "utf8mb4", "utf8mb4_hr_0900_as_cs", "UTF-8");
+
+        putEntry(300, "utf8mb4", "utf8mb4_vi_0900_as_cs", "UTF-8");
+        putEntry(303, "utf8mb4", "utf8mb4_ja_0900_as_cs", "UTF-8");
+        putEntry(304, "utf8mb4", "utf8mb4_ja_0900_as_cs_ks", "UTF-8");
+        putEntry(305, "utf8mb4", "utf8mb4_0900_as_ci", "UTF-8");
+        putEntry(306, "utf8mb4", "utf8mb4_ru_0900_ai_ci", "UTF-8");
+        putEntry(307, "utf8mb4", "utf8mb4_ru_0900_as_cs", "UTF-8");
+
+        putEntry(326, "utf8mb4", "utf8mb4_test_ci", "UTF-8");
+        putEntry(327, "utf16", "utf16_test_ci", "UTF-16");
+        putEntry(328, "utf8mb4", "utf8mb4_test_400_ci", "UTF-8");
+
+        putEntry(336, "utf8", "utf8_bengali_standard_ci", "UTF-8");
+        putEntry(337, "utf8", "utf8_bengali_standard_ci", "UTF-8");
+        putEntry(352, "utf8", "utf8_phone_ci", "UTF-8");
+        putEntry(353, "utf8", "utf8_test_ci", "UTF-8");
+        putEntry(354, "utf8", "utf8_5624_1", "UTF-8");
+        putEntry(355, "utf8", "utf8_5624_2", "UTF-8");
+        putEntry(356, "utf8", "utf8_5624_3", "UTF-8");
+        putEntry(357, "utf8", "utf8_5624_4", "UTF-8");
+        putEntry(358, "ucs2", "ucs2_test_ci", "UnicodeBig");
+        putEntry(359, "ucs2", "ucs2_vn_ci", "UnicodeBig");
+        putEntry(360, "ucs2", "ucs2_5624_1", "UnicodeBig");
+
+        putEntry(368, "utf8", "utf8_5624_5", "UTF-8");
+        putEntry(391, "utf32", "utf32_test_ci", "UTF-32");
+        putEntry(2047, "utf8", "utf8_maxuserid_ci", "UTF-8");
     }
 
     /**

+ 1 - 1
deployer/src/main/resources/canal.properties

@@ -111,4 +111,4 @@ canal.mq.canalBatchSize = 50
 canal.mq.canalGetTimeout = 100
 canal.mq.flatMessage = true
 canal.mq.compressionType = none
-canal.mq.acks = all
+canal.mq.acks = all

+ 1 - 1
deployer/src/main/resources/example/instance.properties

@@ -45,7 +45,7 @@ canal.instance.filter.black.regex=
 
 # mq config
 canal.mq.topic=example
-# 动态topic, 需mq支持动态创建topic
+# dynamic topic route by table regex
 #canal.mq.dynamicTopic=.*,mytest\\..*,mytest2.user
 canal.mq.partition=0
 # hash partition config

+ 4 - 3
example/src/main/bin/startup.bat

@@ -8,7 +8,7 @@ if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%
 set conf_dir=%ENV_PATH%\..\conf
 set logback_configurationFile=%conf_dir%\logback.xml
 set client_mode=Simple
-if "%1%" != "" set client_mode=%1%
+if not "%1" == "" set client_mode=%1
 
 set CLASSPATH=%conf_dir%
 set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%
@@ -20,7 +20,8 @@ set CANAL_OPTS= -DappName=otter-canal-example -Dlogback.configurationFile="%logb
 
 set JAVA_OPTS= %JAVA_MEM_OPTS% %JAVA_OPTS_EXT% %JAVA_DEBUG_OPT% %CANAL_OPTS%
 
-if "%client_mode%" == "Cluster"  
+if "%client_mode%" == "Cluster" (
 	java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.example.ClusterCanalClientTest
-else 
+) else (
 	java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.example.SimpleCanalClientTest
+)

+ 1 - 1
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java

@@ -6,7 +6,7 @@ public class CanalMQConfig {
     private Integer partition;
     private Integer partitionsNum;
     private String  partitionHash;
-    private String dynamicTopic;
+    private String  dynamicTopic;
 
     public String getTopic() {
         return topic;

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java

@@ -20,7 +20,7 @@ public interface ErosaConnection {
     /**
      * 用于快速数据查找,和dump的区别在于,seek会只给出部分的数据
      */
-    public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;
+    public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException;
 
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;
 

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -75,7 +75,7 @@ public class LocalBinLogConnection implements ErosaConnection {
         return running;
     }
 
-    public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
+    public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException {
     }
 
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {

+ 9 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -10,6 +10,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
@@ -118,7 +119,7 @@ public class MysqlConnection implements ErosaConnection {
     /**
      * 加速主备切换时的查找速度,做一些特殊优化,比如只解析事务头或者尾
      */
-    public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
+    public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException {
         updateSettings();
         loadBinlogChecksum();
         sendBinlogDump(binlogfilename, binlogPosition);
@@ -130,6 +131,13 @@ public class MysqlConnection implements ErosaConnection {
         decoder.handle(LogEvent.QUERY_EVENT);
         decoder.handle(LogEvent.XID_EVENT);
         LogContext context = new LogContext();
+        // 若entry position存在gtid,则使用传入的gtid作为gtidSet 拼接的标准,否则同时开启gtid和tsdb时,会导致丢失gtid
+        // 而当源端数据库gtid 有purged时会有如下类似报错
+        // 'errno = 1236, sqlstate = HY000 errmsg = The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1 ...
+        if (StringUtils.isNotEmpty(gtid)) {
+            decoder.handle(LogEvent.GTID_LOG_EVENT);
+            context.setGtidSet(MysqlGTIDSet.parse(gtid));
+        }
         context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
             accumulateReceivedBytes(fetcher.limit());

+ 5 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -527,7 +527,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         // 针对开始的第一条为非Begin记录,需要从该binlog扫描
         final java.util.concurrent.atomic.AtomicLong preTransactionStartPosition = new java.util.concurrent.atomic.AtomicLong(0L);
         mysqlConnection.reconnect();
-        mysqlConnection.seek(entryPosition.getJournalName(), 4L, new SinkFunction<LogEvent>() {
+        mysqlConnection.seek(entryPosition.getJournalName(), 4L, entryPosition.getGtid(), new SinkFunction<LogEvent>() {
 
             private LogPosition lastPosition;
 
@@ -740,7 +740,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         try {
             mysqlConnection.reconnect();
             // 开始遍历文件
-            mysqlConnection.seek(searchBinlogFile, 4L, new SinkFunction<LogEvent>() {
+            mysqlConnection.seek(searchBinlogFile, 4L, endPosition.getGtid(), new SinkFunction<LogEvent>() {
 
                 private LogPosition lastPosition;
 
@@ -754,6 +754,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                                 event.getLogPos() - event.getEventLen(),
                                 event.getWhen() * 1000,
                                 event.getServerId());
+                            entryPosition.setGtid(event.getHeader().getGtidSetStr());
                             logPosition.setPostion(entryPosition);
                         }
 
@@ -789,6 +790,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                                     entryPosition);
                             }
                             logPosition.setPostion(entryPosition);
+                            entryPosition.setGtid(entry.getHeader().getGtid());
                         } else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
                             // 当前事务开始位点
                             entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
@@ -796,6 +798,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                                 logger.debug("set {} to be pending start position before finding another proper one...",
                                     entryPosition);
                             }
+                            entryPosition.setGtid(entry.getHeader().getGtid());
                             logPosition.setPostion(entryPosition);
                         }