Răsfoiți Sursa

Merge pull request #30 from alibaba/master

merge
rewerma 6 ani în urmă
părinte
comite
92666b41e8
16 a modificat fișierele cu 206 adăugiri și 166 ștergeri
  1. 6 5
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  2. 14 9
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  3. 11 5
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
  4. 8 2
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  5. 3 2
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java
  6. 34 22
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  7. 26 41
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java
  8. 13 9
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
  9. 7 2
      client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java
  10. 12 6
      client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaOffsetClientExample.java
  11. 8 3
      client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java
  12. 3 3
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/CharsetConversion.java
  13. 16 9
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/ManagerRemoteConfigMonitor.java
  14. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/AbstractRequest.java
  15. 40 44
      server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java
  16. 4 3
      sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

+ 6 - 5
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -3,11 +3,8 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
 import org.apache.kafka.common.errors.WakeupException;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -44,8 +41,12 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
     @Override
     protected void process() {
-        while (!running)
-            ;
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+        }
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
         int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒

+ 14 - 9
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -3,11 +3,7 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
 import org.apache.kafka.common.errors.WakeupException;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -33,18 +29,27 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
         this.topic = topic;
         this.flatMessage = flatMessage;
         this.canalDestination = topic;
-        this.connector = new RocketMQCanalConnector(nameServers, topic, groupId, accessKey, secretKey, flatMessage);
+        this.connector = new RocketMQCanalConnector(nameServers,
+            topic,
+            groupId,
+            accessKey,
+            secretKey,
+            canalClientConfig.getBatchSize(),
+            flatMessage);
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
     }
 
     @Override
     protected void process() {
-        while (!running)
-            ;
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+        }
 
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
-        int retry = canalClientConfig.getRetries() == null
-                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+        int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
 
         while (running) {

+ 11 - 5
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -2,7 +2,8 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.net.SocketAddress;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
@@ -58,11 +59,16 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
 
     @Override
     protected void process() {
-        while (!running)
-            ; // waiting until running == true
+        while (!running) { // waiting until running == true
+            while (!running) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                }
+            }
+        }
 
-        int retry = canalClientConfig.getRetries() == null
-                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+        int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         if (retry == -1) {
             // 重试次数-1代表异常时一直阻塞重试
             retry = Integer.MAX_VALUE;

+ 8 - 2
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -12,6 +12,7 @@ import java.util.concurrent.Future;
 
 import javax.sql.DataSource;
 
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,12 +131,17 @@ public class RdbAdapter implements OuterAdapter {
         String threads = properties.get("threads");
         // String commitSize = properties.get("commitSize");
 
-        rdbSyncService = new RdbSyncService(dataSource, threads != null ? Integer.valueOf(threads) : null);
+        boolean skipDupException = BooleanUtils.toBoolean(configuration.getProperties()
+            .getOrDefault("skipDupException", "true"));
+        rdbSyncService = new RdbSyncService(dataSource,
+            threads != null ? Integer.valueOf(threads) : null,
+            skipDupException);
 
         rdbMirrorDbSyncService = new RdbMirrorDbSyncService(mirrorDbConfigCache,
             dataSource,
             threads != null ? Integer.valueOf(threads) : null,
-            rdbSyncService.getColumnsTypeCache());
+            rdbSyncService.getColumnsTypeCache(),
+            skipDupException);
 
         rdbConfigMonitor = new RdbConfigMonitor();
         rdbConfigMonitor.init(configuration.getKey(), this);

+ 3 - 2
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -35,10 +35,11 @@ public class RdbMirrorDbSyncService {
     private RdbSyncService              rdbSyncService;                                                // rdbSyncService代理
 
     public RdbMirrorDbSyncService(Map<String, MirrorDbConfig> mirrorDbConfigCache, DataSource dataSource,
-                                  Integer threads, Map<String, Map<String, Integer>> columnsTypeCache){
+                                  Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
+                                  boolean skipDupException){
         this.mirrorDbConfigCache = mirrorDbConfigCache;
         this.dataSource = dataSource;
-        this.rdbSyncService = new RdbSyncService(dataSource, threads, columnsTypeCache);
+        this.rdbSyncService = new RdbSyncService(dataSource, threads, columnsTypeCache, skipDupException);
     }
 
     /**

+ 34 - 22
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -44,6 +44,7 @@ public class RdbSyncService {
     private Map<String, Map<String, Integer>> columnsTypeCache;
 
     private int                               threads = 3;
+    private boolean                           skipDupException;
 
     private List<SyncItem>[]                  dmlsPartition;
     private BatchExecutor[]                   batchExecutors;
@@ -58,13 +59,15 @@ public class RdbSyncService {
     }
 
     @SuppressWarnings("unchecked")
-    public RdbSyncService(DataSource dataSource, Integer threads){
-        this(dataSource, threads, new ConcurrentHashMap<>());
+    public RdbSyncService(DataSource dataSource, Integer threads, boolean skipDupException){
+        this(dataSource, threads, new ConcurrentHashMap<>(), skipDupException);
     }
 
     @SuppressWarnings("unchecked")
-    public RdbSyncService(DataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache){
+    public RdbSyncService(DataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
+                          boolean skipDupException){
         this.columnsTypeCache = columnsTypeCache;
+        this.skipDupException = skipDupException;
         try {
             if (threads != null) {
                 this.threads = threads;
@@ -182,16 +185,20 @@ public class RdbSyncService {
      */
     public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
         if (config != null) {
-            String type = dml.getType();
-            if (type != null && type.equalsIgnoreCase("INSERT")) {
-                insert(batchExecutor, config, dml);
-            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
-                update(batchExecutor, config, dml);
-            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
-                delete(batchExecutor, config, dml);
-            }
-            if (logger.isDebugEnabled()) {
-                logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+            try {
+                String type = dml.getType();
+                if (type != null && type.equalsIgnoreCase("INSERT")) {
+                    insert(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                    update(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                    delete(batchExecutor, config, dml);
+                }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
             }
         }
     }
@@ -202,7 +209,7 @@ public class RdbSyncService {
      * @param config 配置项
      * @param dml DML数据
      */
-    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
         Map<String, Object> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
@@ -240,11 +247,20 @@ public class RdbSyncService {
                 throw new RuntimeException("Target column: " + targetColumnName + " not matched");
             }
             Object value = data.get(srcColumnName);
-
             BatchExecutor.setValue(values, type, value);
         }
 
-        batchExecutor.execute(insertSql.toString(), values);
+        try {
+            batchExecutor.execute(insertSql.toString(), values);
+        } catch (SQLException e) {
+            if (skipDupException
+                && (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001: 违反唯一约束条件"))) {
+                // ignore
+                // TODO 增加更多关系数据库的主键冲突的错误码
+            } else {
+                throw e;
+            }
+        }
         if (logger.isTraceEnabled()) {
             logger.trace("Insert into target table, sql: {}", insertSql);
         }
@@ -257,7 +273,7 @@ public class RdbSyncService {
      * @param config 配置项
      * @param dml DML数据
      */
-    private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+    private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
         Map<String, Object> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
@@ -301,9 +317,7 @@ public class RdbSyncService {
 
         // 拼接主键
         appendCondition(dbMapping, updateSql, ctype, values, data, old);
-
         batchExecutor.execute(updateSql.toString(), values);
-
         if (logger.isTraceEnabled()) {
             logger.trace("Update target table, sql: {}", updateSql);
         }
@@ -315,7 +329,7 @@ public class RdbSyncService {
      * @param config
      * @param dml
      */
-    private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+    private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
         Map<String, Object> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
@@ -331,9 +345,7 @@ public class RdbSyncService {
         List<Map<String, ?>> values = new ArrayList<>();
         // 拼接主键
         appendCondition(dbMapping, sql, ctype, values, data);
-
         batchExecutor.execute(sql.toString(), values);
-
         if (logger.isTraceEnabled()) {
             logger.trace("Delete from target table, sql: {}", sql);
         }

+ 26 - 41
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java

@@ -19,16 +19,11 @@ public class BatchExecutor implements Closeable {
     private Connection          conn;
     private AtomicInteger       idx    = new AtomicInteger(0);
 
-    public BatchExecutor(Connection conn){
+    public BatchExecutor(Connection conn) throws SQLException{
         this.conn = conn;
-        try {
-            this.conn.setAutoCommit(false);
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
-        }
+        this.conn.setAutoCommit(false);
     }
 
-
     public Connection getConn() {
         return conn;
     }
@@ -40,53 +35,43 @@ public class BatchExecutor implements Closeable {
         values.add(valueItem);
     }
 
-    public void execute(String sql, List<Map<String, ?>> values) {
-        try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
-            int len = values.size();
-            for (int i = 0; i < len; i++) {
-                int type = (Integer) values.get(i).get("type");
-                Object value = values.get(i).get("value");
-                SyncUtil.setPStmt(type, pstmt, value, i + 1);
-            }
-
-            pstmt.execute();
-            idx.incrementAndGet();
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
+    public void execute(String sql, List<Map<String, ?>> values) throws SQLException {
+        PreparedStatement pstmt = conn.prepareStatement(sql);
+        int len = values.size();
+        for (int i = 0; i < len; i++) {
+            int type = (Integer) values.get(i).get("type");
+            Object value = values.get(i).get("value");
+            SyncUtil.setPStmt(type, pstmt, value, i + 1);
         }
+
+        pstmt.execute();
+        idx.incrementAndGet();
     }
 
-    public void commit() {
-        try {
-            conn.commit();
-            if (logger.isTraceEnabled()) {
-                logger.trace("Batch executor commit " + idx.get() + " rows");
-            }
-            idx.set(0);
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
+    public void commit() throws SQLException {
+        conn.commit();
+        if (logger.isTraceEnabled()) {
+            logger.trace("Batch executor commit " + idx.get() + " rows");
         }
+        idx.set(0);
     }
 
-    public void rollback() {
-        try {
-            conn.rollback();
-            if (logger.isTraceEnabled()) {
-                logger.trace("Batch executor rollback " + idx.get() + " rows");
-            }
-            idx.set(0);
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
+    public void rollback() throws SQLException {
+        conn.rollback();
+        if (logger.isTraceEnabled()) {
+            logger.trace("Batch executor rollback " + idx.get() + " rows");
         }
+        idx.set(0);
     }
 
     @Override
     public void close() {
         if (conn != null) {
             try {
-                conn.close();
-            } catch (SQLException e) {
-                logger.error(e.getMessage(), e);
+                if (conn != null) {
+                    conn.close();
+                }
+            } catch (SQLException ioe) {
             }
         }
     }

+ 13 - 9
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -1,7 +1,5 @@
 package com.alibaba.otter.canal.client.rocketmq;
 
-import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
-import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -26,6 +24,8 @@ import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
+import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 import com.google.common.collect.Lists;
 
 /**
@@ -48,24 +48,26 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     private volatile boolean                    connected           = false;
     private DefaultMQPushConsumer               rocketMQConsumer;
     private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
+    private int                                 batchSize           = -1;
     private long                                batchProcessTimeout = 60 * 1000;
     private boolean                             flatMessage;
     private volatile ConsumerBatchMessage       lastGetBatchMessage = null;
     private String                              accessKey;
     private String                              secretKey;
 
-
-    public RocketMQCanalConnector(String nameServer, String topic, String groupName, boolean flatMessage){
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName, Integer batchSize,
+                                  boolean flatMessage){
         this.nameServer = nameServer;
         this.topic = topic;
         this.groupName = groupName;
         this.flatMessage = flatMessage;
         this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);
+        this.batchSize = batchSize;
     }
 
-    public RocketMQCanalConnector(String nameServer, String topic, String groupName,
-        String accessKey, String secretKey, boolean flatMessage){
-        this(nameServer, topic, groupName, flatMessage);
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName, String accessKey,
+                                  String secretKey, Integer batchSize, boolean flatMessage){
+        this(nameServer, topic, groupName, batchSize, flatMessage);
         this.accessKey = accessKey;
         this.secretKey = secretKey;
     }
@@ -73,8 +75,7 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     public void connect() throws CanalClientException {
 
         RPCHook rpcHook = null;
-        if(null != accessKey && accessKey.length() > 0
-            && null != secretKey && secretKey.length() > 0){
+        if (null != accessKey && accessKey.length() > 0 && null != secretKey && secretKey.length() > 0) {
             SessionCredentials sessionCredentials = new SessionCredentials();
             sessionCredentials.setAccessKey(accessKey);
             sessionCredentials.setSecretKey(secretKey);
@@ -85,6 +86,9 @@ public class RocketMQCanalConnector implements CanalMQConnector {
         if (!StringUtils.isBlank(nameServer)) {
             rocketMQConsumer.setNamesrvAddr(nameServer);
         }
+        if (batchSize != -1) {
+            rocketMQConsumer.setConsumeMessageBatchMaxSize(batchSize);
+        }
     }
 
     public void disconnect() throws CanalClientException {

+ 7 - 2
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java

@@ -99,8 +99,13 @@ public class CanalKafkaClientExample {
     }
 
     private void process() {
-        while (!running)
-            ;
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+        }
+
         while (running) {
             try {
                 connector.connect();

+ 12 - 6
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaOffsetClientExample.java

@@ -1,14 +1,15 @@
 package com.alibaba.otter.canal.client.running.kafka;
 
-import com.alibaba.otter.canal.client.kafka.KafkaOffsetCanalConnector;
-import com.alibaba.otter.canal.client.kafka.protocol.KafkaMessage;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import com.alibaba.otter.canal.client.kafka.KafkaOffsetCanalConnector;
+import com.alibaba.otter.canal.client.kafka.protocol.KafkaMessage;
 
 /**
  * KafkaOffsetCanalConnector 使用示例
@@ -99,8 +100,13 @@ public class CanalKafkaOffsetClientExample {
     }
 
     private void process() {
-        while (!running)
-            ;
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+        }
+
         while (running) {
             try {
                 // 修改 AutoOffsetReset 的值,默认(earliest)

+ 8 - 3
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java

@@ -36,7 +36,7 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
                                                     };
 
     public CanalRocketMQClientExample(String nameServers, String topic, String groupId){
-        connector = new RocketMQCanalConnector(nameServers, topic, groupId, false);
+        connector = new RocketMQCanalConnector(nameServers, topic, groupId, 500, false);
     }
 
     public static void main(String[] args) {
@@ -97,8 +97,13 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
     }
 
     private void process() {
-        while (!running)
-            ;
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+        }
+
         while (running) {
             try {
                 connector.connect();

+ 3 - 3
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/CharsetConversion.java

@@ -32,7 +32,7 @@ public final class CharsetConversion {
     }
 
     // Character set data used in lookups. The array will be sparse.
-    static final Entry[] entries = new Entry[0xff];
+    static final Entry[] entries = new Entry[2048];
 
     static Entry getEntry(final int id) {
         if (id >= 0 && id < entries.length) {
@@ -95,7 +95,7 @@ public final class CharsetConversion {
         putEntry(34, "cp1250", "cp1250_czech_cs", "Cp1250");
         putEntry(35, "ucs2", "ucs2_general_ci", "UnicodeBig");
         putEntry(36, "cp866", "cp866_general_ci", "Cp866");
-        putEntry(37, "keybcs2", "keybcs2_general_ci", "Cp895");
+        putEntry(37, "keybcs2", "keybcs2_general_ci", "Cp852");
         putEntry(38, "macce", "macce_general_ci", "MacCentralEurope");
         putEntry(39, "macroman", "macroman_general_ci", "MacRoman");
         putEntry(40, "cp852", "cp852_general_ci", "Cp852");
@@ -129,7 +129,7 @@ public final class CharsetConversion {
         putEntry(70, "greek", "greek_bin", "ISO8859_7");
         putEntry(71, "hebrew", "hebrew_bin", "ISO8859_8");
         putEntry(72, "hp8", "hp8_bin", "US-ASCII");
-        putEntry(73, "keybcs2", "keybcs2_bin", "Cp895");
+        putEntry(73, "keybcs2", "keybcs2_bin", "Cp852");
         putEntry(74, "koi8r", "koi8r_bin", "KOI8_R");
         putEntry(75, "koi8u", "koi8u_bin", "KOI8_U");
         putEntry(77, "latin2", "latin2_bin", "ISO8859_2");

+ 16 - 9
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/ManagerRemoteConfigMonitor.java

@@ -4,8 +4,16 @@ import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.nio.charset.StandardCharsets;
-import java.sql.*;
-import java.util.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -39,7 +47,7 @@ public class ManagerRemoteConfigMonitor {
 
     private long                     scanIntervalInSecond   = 5;
     private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
-        new NamedThreadFactory("remote-canal-config-scan"));
+                                                                new NamedThreadFactory("remote-canal-config-scan"));
 
     public ManagerRemoteConfigMonitor(String jdbcUrl, String jdbcUsername, String jdbcPassword){
         this.jdbcUrl = jdbcUrl;
@@ -75,8 +83,8 @@ public class ManagerRemoteConfigMonitor {
                     overrideLocalCanalConfig(configItem.getContent());
                     properties = new Properties();
                     properties.load(new ByteArrayInputStream(configItem.getContent().getBytes(StandardCharsets.UTF_8)));
-                    scanIntervalInSecond = Integer
-                        .valueOf(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL, "5"));
+                    scanIntervalInSecond = Integer.valueOf(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
+                        "5"));
                 }
             }
         } catch (Exception e) {
@@ -223,8 +231,7 @@ public class ManagerRemoteConfigMonitor {
         Map<String, ConfigItem> changedInstanceConfigs = modifiedInstanceConfigs[0];
         if (changedInstanceConfigs != null) {
             for (ConfigItem configItem : changedInstanceConfigs.values()) {
-                try (FileWriter writer = new FileWriter(
-                    getConfPath() + configItem.getName() + "/instance.properties")) {
+                try (FileWriter writer = new FileWriter(getConfPath() + configItem.getName() + "/instance.properties")) {
                     writer.write(configItem.getContent());
                     writer.flush();
                 } catch (Exception e) {
@@ -351,8 +358,8 @@ public class ManagerRemoteConfigMonitor {
         }
     }
 
-    public interface Listener<Properties> {
+    public interface Listener<T> {
 
-        void onChange(Properties properties);
+        void onChange(T properties);
     }
 }

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/AbstractRequest.java

@@ -160,7 +160,7 @@ public abstract class AbstractRequest<T> {
     }
 
     public final String formatUTCTZ(Date date) {
-        SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss'Z'");
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
         sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
         return sdf.format(date);
     }

+ 40 - 44
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -1,6 +1,12 @@
 package com.alibaba.otter.canal.common;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -24,46 +30,36 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class MQMessageUtils {
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap
-        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<PartitionData>>() {
+    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                             new Function<String, List<PartitionData>>() {
 
                                                                                  public List<PartitionData> apply(String pkHashConfigs) {
-                                                                                     List<PartitionData> datas = Lists
-                                                                                         .newArrayList();
-                                                                                     String[] pkHashConfigArray = StringUtils
-                                                                                         .split(pkHashConfigs, ",");
+                                                                                     List<PartitionData> datas = Lists.newArrayList();
+                                                                                     String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
+                                                                                         ",");
                                                                                      // schema.table:id^name
                                                                                      for (String pkHashConfig : pkHashConfigArray) {
                                                                                          PartitionData data = new PartitionData();
-                                                                                         int i = pkHashConfig
-                                                                                             .lastIndexOf(":");
+                                                                                         int i = pkHashConfig.lastIndexOf(":");
                                                                                          if (i > 0) {
-                                                                                             String pkStr = pkHashConfig
-                                                                                                 .substring(i + 1);
-                                                                                             if (pkStr.equalsIgnoreCase(
-                                                                                                 "$pk$")) {
+                                                                                             String pkStr = pkHashConfig.substring(i + 1);
+                                                                                             if (pkStr.equalsIgnoreCase("$pk$")) {
                                                                                                  data.hashMode.autoPkHash = true;
                                                                                              } else {
-                                                                                                 data.hashMode.pkNames = Lists
-                                                                                                     .newArrayList(
-                                                                                                         StringUtils
-                                                                                                             .split(
-                                                                                                                 pkStr,
-                                                                                                                 '^'));
+                                                                                                 data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
+                                                                                                     '^'));
                                                                                              }
 
-                                                                                             pkHashConfig = pkHashConfig
-                                                                                                 .substring(0, i);
+                                                                                             pkHashConfig = pkHashConfig.substring(0,
+                                                                                                 i);
                                                                                          } else {
                                                                                              data.hashMode.tableHash = true;
                                                                                          }
 
-                                                                                         if (!isWildCard(
-                                                                                             pkHashConfig)) {
+                                                                                         if (!isWildCard(pkHashConfig)) {
                                                                                              data.simpleName = pkHashConfig;
                                                                                          } else {
-                                                                                             data.regexFilter = new AviaterRegexFilter(
-                                                                                                 pkHashConfig);
+                                                                                             data.regexFilter = new AviaterRegexFilter(pkHashConfig);
                                                                                          }
                                                                                          datas.add(data);
                                                                                      }
@@ -73,29 +69,24 @@ public class MQMessageUtils {
                                                                              });
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap
-        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<DynamicTopicData>>() {
+    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                             new Function<String, List<DynamicTopicData>>() {
 
                                                                                  public List<DynamicTopicData> apply(String pkHashConfigs) {
-                                                                                     List<DynamicTopicData> datas = Lists
-                                                                                         .newArrayList();
-                                                                                     String[] dynamicTopicArray = StringUtils
-                                                                                         .split(pkHashConfigs, ",");
+                                                                                     List<DynamicTopicData> datas = Lists.newArrayList();
+                                                                                     String[] dynamicTopicArray = StringUtils.split(pkHashConfigs,
+                                                                                         ",");
                                                                                      // schema.table
                                                                                      for (String dynamicTopic : dynamicTopicArray) {
                                                                                          DynamicTopicData data = new DynamicTopicData();
 
-                                                                                         if (!isWildCard(
-                                                                                             dynamicTopic)) {
+                                                                                         if (!isWildCard(dynamicTopic)) {
                                                                                              data.simpleName = dynamicTopic;
                                                                                          } else {
-                                                                                             if (dynamicTopic
-                                                                                                 .contains("\\.")) {
-                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(
-                                                                                                     dynamicTopic);
+                                                                                             if (dynamicTopic.contains("\\.")) {
+                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(dynamicTopic);
                                                                                              } else {
-                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(
-                                                                                                     dynamicTopic);
+                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(dynamicTopic);
                                                                                              }
                                                                                          }
                                                                                          datas.add(data);
@@ -232,6 +223,9 @@ public class MQMessageUtils {
                             partitionEntries[pkHash].add(entry);
                         }
                     }
+                } else {
+                    // 针对stmt/mixed binlog格式的query事件
+                    partitionEntries[0].add(entry);
                 }
             }
         }
@@ -281,9 +275,8 @@ public class MQMessageUtils {
                 try {
                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                 } catch (Exception e) {
-                    throw new RuntimeException(
-                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
-                        e);
+                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+                                               + entry.toString(), e);
                 }
 
                 CanalEntry.EventType eventType = rowChange.getEventType();
@@ -455,6 +448,9 @@ public class MQMessageUtils {
                         idx++;
                     }
                 }
+            } else {
+                // 针对stmt/mixed binlog格式的query事件
+                partitionMessages[0] = flatMessage;
             }
         }
         return partitionMessages;
@@ -524,8 +520,8 @@ public class MQMessageUtils {
 
     private static boolean isWildCard(String value) {
         // not contaiins '.' ?
-        return StringUtils.containsAny(value,
-            new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$', '^' });
+        return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
+                '^' });
     }
 
     private static void put2MapMessage(Map<String, Message> messageMap, Long messageId, String topicName,

+ 4 - 3
sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

@@ -100,11 +100,12 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
                 && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND)) {
                 long currentTimestamp = entry.getHeader().getExecuteTime();
                 // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
-                if (Math.abs(currentTimestamp - lastTransactionTimestamp) > emptyTransactionInterval
-                    || lastTransactionCount.incrementAndGet() > emptyTransctionThresold) {
+                if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold
+                    && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) {
+                    continue;
+                } else {
                     lastTransactionCount.set(0L);
                     lastTransactionTimestamp = currentTimestamp;
-                    continue;
                 }
             }