Explorar o código

Merge pull request #1 from alibaba/master

update
bigbro %!s(int64=6) %!d(string=hai) anos
pai
achega
ccd39e9d62
Modificáronse 27 ficheiros con 1098 adicións e 864 borrados
  1. 2 2
      client-adapter/README.md
  2. 19 22
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/BootstrapConfiguration.java
  3. 8 15
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java
  4. 56 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/ConfigItem.java
  5. 101 156
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/DbRemoteConfigLoader.java
  6. 31 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/RemoteAdapterMonitor.java
  7. 30 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/RemoteConfigLoader.java
  8. 34 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/RemoteConfigLoaderFactory.java
  9. 1 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/ConfigLoader.java
  10. 34 32
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  11. 4 2
      client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
  12. 4 4
      common/src/main/java/com/alibaba/otter/canal/common/zookeeper/ZkClientx.java
  13. 1 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java
  14. 80 65
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
  15. 13 18
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java
  16. 4 1
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java
  17. 47 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/ConfigItem.java
  18. 95 147
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/DbRemoteConfigLoader.java
  19. 13 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/RemoteCanalConfigMonitor.java
  20. 36 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/RemoteConfigLoader.java
  21. 35 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/RemoteConfigLoaderFactory.java
  22. 31 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/RemoteInstanceMonitor.java
  23. 3 1
      deployer/src/main/resources/canal.properties
  24. 334 329
      meta/src/main/java/com/alibaba/otter/canal/meta/ZooKeeperMetaManager.java
  25. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  26. 9 0
      server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java
  27. 72 68
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

+ 2 - 2
client-adapter/README.md

@@ -55,7 +55,7 @@ canal.conf:
           hbase.zookeeper.quorum: slave1
           hbase.zookeeper.property.clientPort: 2181
           zookeeper.znode.parent: /hbase
-  mqTopics:                         # MQ topic, 如果是kafka或者rockeMQ模式可配置此项, 与canalInstances不能并存
+  mqTopics:                         # MQ topic, 如果是kafka或者rockeMQ模式可配置此项, 与canalInstances不能并存
   - mqMode: kafka                   # MQ的模式: kafak/rocketMQ
     topic: example                  # MQ topic
     groups:                         # group组
@@ -542,4 +542,4 @@ bin/startup.sh
 #### 验证
 1. 新增mysql mytest.user表的数据, 将会自动同步到es的mytest_user索引下面, 并会打出DML的log
 2. 修改mysql mytest.role表的role_name, 将会自动同步es的mytest_user索引中的role_name数据
-3. 新增或者修改mysql mytest.label表的label, 将会自动同步es的mytest_user索引中的labels数据
+3. 新增或者修改mysql mytest.label表的label, 将会自动同步es的mytest_user索引中的labels数据

+ 19 - 22
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/BootstrapConfiguration.java

@@ -1,14 +1,14 @@
 package com.alibaba.otter.canal.adapter.launcher.config;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 
-import com.alibaba.otter.canal.adapter.launcher.monitor.AdapterRemoteConfigMonitor;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
 
+import com.alibaba.otter.canal.adapter.launcher.monitor.remote.RemoteConfigLoader;
+import com.alibaba.otter.canal.adapter.launcher.monitor.remote.RemoteConfigLoaderFactory;
+
 /**
  * Bootstrap级别配置加载
  *
@@ -17,28 +17,25 @@ import org.springframework.core.env.Environment;
  */
 public class BootstrapConfiguration {
 
-    private static final Logger logger = LoggerFactory.getLogger(BootstrapConfiguration.class);
-
     @Autowired
-    private Environment         env;
+    private Environment        env;
+
+    private RemoteConfigLoader remoteConfigLoader = null;
 
     @PostConstruct
     public void loadRemoteConfig() {
-        try {
-            // 加载远程配置
-            String jdbcUrl = env.getProperty("canal.manager.jdbc.url");
-            if (StringUtils.isNotEmpty(jdbcUrl)) {
-                String jdbcUsername = env.getProperty("canal.manager.jdbc.username");
-                String jdbcPassword = env.getProperty("canal.manager.jdbc.password");
-                AdapterRemoteConfigMonitor configMonitor = new AdapterRemoteConfigMonitor(jdbcUrl,
-                    jdbcUsername,
-                    jdbcPassword);
-                configMonitor.loadRemoteConfig();
-                configMonitor.loadRemoteAdapterConfigs();
-                configMonitor.start(); // 启动监听
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+        remoteConfigLoader = RemoteConfigLoaderFactory.getRemoteConfigLoader(env);
+        if (remoteConfigLoader != null) {
+            remoteConfigLoader.loadRemoteConfig();
+            remoteConfigLoader.loadRemoteAdapterConfigs();
+            remoteConfigLoader.startMonitor(); // 启动监听
+        }
+    }
+
+    @PreDestroy
+    public synchronized void destroy() {
+        if (remoteConfigLoader != null) {
+            remoteConfigLoader.destroy();
         }
     }
 }

+ 8 - 15
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

@@ -4,8 +4,6 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 
-import com.alibaba.otter.canal.adapter.launcher.monitor.AdapterRemoteConfigMonitor;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.cloud.context.config.annotation.RefreshScope;
@@ -29,27 +27,25 @@ import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 @RefreshScope
 public class CanalAdapterService {
 
-    private static final Logger        logger        = LoggerFactory.getLogger(CanalAdapterService.class);
+    private static final Logger logger  = LoggerFactory.getLogger(CanalAdapterService.class);
 
-    private CanalAdapterLoader         adapterLoader;
+    private CanalAdapterLoader  adapterLoader;
 
     @Resource
-    private ContextRefresher           contextRefresher;
+    private ContextRefresher    contextRefresher;
 
     @Resource
-    private AdapterCanalConfig         adapterCanalConfig;
+    private AdapterCanalConfig  adapterCanalConfig;
     @Resource
-    private Environment                env;
+    private Environment         env;
 
     // 注入bean保证优先注册
     @Resource
-    private SpringContext              springContext;
+    private SpringContext       springContext;
     @Resource
-    private SyncSwitch                 syncSwitch;
+    private SyncSwitch          syncSwitch;
 
-    private volatile boolean           running       = false;
-
-    private AdapterRemoteConfigMonitor configMonitor = null;
+    private volatile boolean    running = false;
 
     @PostConstruct
     public synchronized void init() {
@@ -75,9 +71,6 @@ public class CanalAdapterService {
         try {
             running = false;
             logger.info("## stop the canal client adapters");
-            if (configMonitor != null) {
-                configMonitor.destroy();
-            }
 
             if (adapterLoader != null) {
                 adapterLoader.destroy();

+ 56 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/ConfigItem.java

@@ -0,0 +1,56 @@
+package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
+
+/**
+ * 配置对应对象
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public class ConfigItem {
+
+    private Long   id;
+    private String category;
+    private String name;
+    private String content;
+    private long   modifiedTime;
+
+    public Long getId() {
+        return id;
+    }
+
+    public void setId(Long id) {
+        this.id = id;
+    }
+
+    public String getCategory() {
+        return category;
+    }
+
+    public void setCategory(String category) {
+        this.category = category;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    public void setContent(String content) {
+        this.content = content;
+    }
+
+    public long getModifiedTime() {
+        return modifiedTime;
+    }
+
+    public void setModifiedTime(long modifiedTime) {
+        this.modifiedTime = modifiedTime;
+    }
+}

+ 101 - 156
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/AdapterRemoteConfigMonitor.java → client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/DbRemoteConfigLoader.java

@@ -1,10 +1,11 @@
-package com.alibaba.otter.canal.adapter.launcher.monitor;
+package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileWriter;
-import java.net.URL;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -13,54 +14,61 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
 
-import com.alibaba.otter.canal.client.adapter.support.Constant;
-import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
 import com.google.common.base.Joiner;
 import com.google.common.collect.MapMaker;
 
 /**
- * 远程配置装载、监控类
+ * 基于数据库的远程配置装载器
  *
- * @author rewerma @ 2019-01-05
+ * @author rewerma 2019-01-25 下午05:20:16
  * @version 1.0.0
  */
-public class AdapterRemoteConfigMonitor {
+public class DbRemoteConfigLoader implements RemoteConfigLoader {
 
-    private static final Logger      logger                 = LoggerFactory.getLogger(AdapterRemoteConfigMonitor.class);
+    private static final Logger      logger                 = LoggerFactory.getLogger(DbRemoteConfigLoader.class);
 
-    private Connection               conn;
-    private String                   jdbcUrl;
-    private String                   jdbcUsername;
-    private String                   jdbcPassword;
+    private DruidDataSource          dataSource;
 
-    private long                     currentConfigTimestamp = 0;
+    private static volatile long     currentConfigTimestamp = 0;
     private Map<String, ConfigItem>  remoteAdapterConfigs   = new MapMaker().makeMap();
 
     private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
         new NamedThreadFactory("remote-adapter-config-scan"));
 
-    public AdapterRemoteConfigMonitor(String jdbcUrl, String jdbcUsername, String jdbcPassword){
-        this.jdbcUrl = jdbcUrl;
-        this.jdbcUsername = jdbcUsername;
-        this.jdbcPassword = jdbcPassword;
-    }
+    private RemoteAdapterMonitor     remoteAdapterMonitor   = new RemoteAdapterMonitorImpl();
 
-    private Connection getConn() throws Exception {
-        if (conn == null || conn.isClosed()) {
-            Class.forName("com.mysql.jdbc.Driver");
-            conn = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword);
+    public DbRemoteConfigLoader(String driverName, String jdbcUrl, String jdbcUsername, String jdbcPassword){
+        dataSource = new DruidDataSource();
+        if (StringUtils.isEmpty(driverName)) {
+            driverName = "com.mysql.jdbc.Driver";
+        }
+        dataSource.setDriverClassName(driverName);
+        dataSource.setUrl(jdbcUrl);
+        dataSource.setUsername(jdbcUsername);
+        dataSource.setPassword(jdbcPassword);
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(1);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            throw new RuntimeException(e.getMessage(), e);
         }
-        return conn;
     }
 
     /**
-     * 加载远程application.yml配置到本地
+     * 加载远程application.yml配置
      */
+    @Override
     public void loadRemoteConfig() {
         try {
             // 加载远程adapter配置
@@ -84,7 +92,9 @@ public class AdapterRemoteConfigMonitor {
      */
     private ConfigItem getRemoteAdapterConfig() {
         String sql = "select name, content, modified_time from canal_config where id=2";
-        try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
+        try (Connection conn = dataSource.getConnection();
+                Statement stmt = conn.createStatement();
+                ResultSet rs = stmt.executeQuery(sql)) {
             if (rs.next()) {
                 ConfigItem configItem = new ConfigItem();
                 configItem.setId(2L);
@@ -114,54 +124,29 @@ public class AdapterRemoteConfigMonitor {
     }
 
     /**
-     * 启动监听数据库变化
-     */
-    public void start() {
-        // 监听application.yml变化
-        executor.scheduleWithFixedDelay(() -> {
-            try {
-                loadRemoteConfig();
-            } catch (Throwable e) {
-                logger.error("scan remote application.yml failed", e);
-            }
-        }, 10, 3, TimeUnit.SECONDS);
-
-        // 监听adapter变化
-        executor.scheduleWithFixedDelay(() -> {
-            try {
-                loadRemoteAdapterConfigs();
-            } catch (Throwable e) {
-                logger.error("scan remote adapter configs failed", e);
-            }
-        }, 10, 3, TimeUnit.SECONDS);
-    }
-
-    /**
-     * 加载adapter配置到本地
+     * 加载adapter配置
      */
+    @Override
     public void loadRemoteAdapterConfigs() {
         try {
             // 加载远程adapter配置
-            Map<String, ConfigItem>[] modifiedConfigs = getModifiedAdapterConfigs();
-            if (modifiedConfigs != null) {
-                overrideLocalAdapterConfigs(modifiedConfigs);
-            }
+            loadModifiedAdapterConfigs();
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
-     * 获取有变动的adapter配置
-     *
-     * @return Map[0]: 新增修改的配置, Map[1]: 删除的配置
+     * 加载有变动的adapter配置
      */
     @SuppressWarnings("unchecked")
-    private Map<String, ConfigItem>[] getModifiedAdapterConfigs() {
+    private void loadModifiedAdapterConfigs() {
         Map<String, ConfigItem>[] res = new Map[2];
         Map<String, ConfigItem> remoteConfigStatus = new HashMap<>();
         String sql = "select id, category, name, modified_time from canal_adapter_config";
-        try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
+        try (Connection conn = dataSource.getConnection();
+                Statement stmt = conn.createStatement();
+                ResultSet rs = stmt.executeQuery(sql)) {
             while (rs.next()) {
                 ConfigItem configItem = new ConfigItem();
                 configItem.setId(rs.getLong("id"));
@@ -172,7 +157,6 @@ public class AdapterRemoteConfigMonitor {
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
-            return null;
         }
 
         if (!remoteConfigStatus.isEmpty()) {
@@ -192,10 +176,11 @@ public class AdapterRemoteConfigMonitor {
                 }
             }
             if (!changedIds.isEmpty()) {
-                Map<String, ConfigItem> changedAdapterConfig = new HashMap<>();
                 String contentsSql = "select id, category, name, content, modified_time from canal_adapter_config  where id in ("
                                      + Joiner.on(",").join(changedIds) + ")";
-                try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(contentsSql)) {
+                try (Connection conn = dataSource.getConnection();
+                        Statement stmt = conn.createStatement();
+                        ResultSet rs = stmt.executeQuery(contentsSql)) {
                     while (rs.next()) {
                         ConfigItem configItemNew = new ConfigItem();
                         configItemNew.setId(rs.getLong("id"));
@@ -206,63 +191,20 @@ public class AdapterRemoteConfigMonitor {
 
                         remoteAdapterConfigs.put(configItemNew.getCategory() + "/" + configItemNew.getName(),
                             configItemNew);
-                        changedAdapterConfig.put(configItemNew.getCategory() + "/" + configItemNew.getName(),
-                            configItemNew);
+                        remoteAdapterMonitor.onModify(configItemNew);
                     }
 
-                    res[0] = changedAdapterConfig.isEmpty() ? null : changedAdapterConfig;
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                 }
             }
         }
 
-        Map<String, ConfigItem> removedAdapterConfig = new HashMap<>();
         for (ConfigItem configItem : remoteAdapterConfigs.values()) {
             if (!remoteConfigStatus.containsKey(configItem.getCategory() + "/" + configItem.getName())) {
                 // 删除
                 remoteAdapterConfigs.remove(configItem.getCategory() + "/" + configItem.getName());
-                removedAdapterConfig.put(configItem.getCategory() + "/" + configItem.getName(), null);
-            }
-        }
-        res[1] = removedAdapterConfig.isEmpty() ? null : removedAdapterConfig;
-
-        if (res[0] == null && res[1] == null) {
-            return null;
-        } else {
-            return res;
-        }
-    }
-
-    /**
-     * 覆盖adapter配置到本地
-     *
-     * @param modifiedAdapterConfigs 变动的配置集合
-     */
-    private void overrideLocalAdapterConfigs(Map<String, ConfigItem>[] modifiedAdapterConfigs) {
-        Map<String, ConfigItem> changedAdapterConfigs = modifiedAdapterConfigs[0];
-        if (changedAdapterConfigs != null) {
-            for (ConfigItem configItem : changedAdapterConfigs.values()) {
-                try (FileWriter writer = new FileWriter(
-                    getConfPath() + configItem.getCategory() + "/" + configItem.getName())) {
-                    writer.write(configItem.getContent());
-                    writer.flush();
-                    logger.info("## Loaded remote adapter config: {}/{}",
-                        configItem.getCategory(),
-                        configItem.getName());
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-        }
-        Map<String, ConfigItem> removedAdapterConfigs = modifiedAdapterConfigs[1];
-        if (removedAdapterConfigs != null) {
-            for (String name : removedAdapterConfigs.keySet()) {
-                File file = new File(getConfPath() + name);
-                if (file.exists()) {
-                    deleteDir(file);
-                    logger.info("## Deleted and reloaded remote adapter config: {}", name);
-                }
+                remoteAdapterMonitor.onDelete(configItem.getCategory() + "/" + configItem.getName());
             }
         }
     }
@@ -302,66 +244,69 @@ public class AdapterRemoteConfigMonitor {
         }
     }
 
-    public void destroy() {
-        executor.shutdownNow();
-        if (conn != null) {
+    /**
+     * 启动监听数据库变化
+     */
+    @Override
+    public void startMonitor() {
+        // 监听application.yml变化
+        executor.scheduleWithFixedDelay(() -> {
             try {
-                conn.close();
-            } catch (SQLException e) {
-                logger.error(e.getMessage(), e);
+                loadRemoteConfig();
+            } catch (Throwable e) {
+                logger.error("scan remote application.yml failed", e);
             }
-        }
+        }, 10, 3, TimeUnit.SECONDS);
+
+        // 监听adapter变化
+        executor.scheduleWithFixedDelay(() -> {
+            try {
+                loadRemoteAdapterConfigs();
+            } catch (Throwable e) {
+                logger.error("scan remote adapter configs failed", e);
+            }
+        }, 10, 3, TimeUnit.SECONDS);
     }
 
     /**
-     * 配置对应对象
+     * 销毁
      */
-    public static class ConfigItem {
-
-        private Long   id;
-        private String category;
-        private String name;
-        private String content;
-        private long   modifiedTime;
-
-        public Long getId() {
-            return id;
-        }
-
-        public void setId(Long id) {
-            this.id = id;
-        }
-
-        public String getCategory() {
-            return category;
-        }
-
-        public void setCategory(String category) {
-            this.category = category;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public void setName(String name) {
-            this.name = name;
+    @Override
+    public void destroy() {
+        executor.shutdownNow();
+        try {
+            dataSource.close();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
+    }
 
-        public String getContent() {
-            return content;
-        }
+    private class RemoteAdapterMonitorImpl implements RemoteAdapterMonitor {
 
-        public void setContent(String content) {
-            this.content = content;
+        @Override
+        public void onAdd(ConfigItem configItem) {
+            this.onModify(configItem);
         }
 
-        public long getModifiedTime() {
-            return modifiedTime;
+        @Override
+        public void onModify(ConfigItem configItem) {
+            try (FileWriter writer = new FileWriter(
+                getConfPath() + configItem.getCategory() + "/" + configItem.getName())) {
+                writer.write(configItem.getContent());
+                writer.flush();
+                logger.info("## Loaded remote adapter config: {}/{}", configItem.getCategory(), configItem.getName());
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
         }
 
-        public void setModifiedTime(long modifiedTime) {
-            this.modifiedTime = modifiedTime;
+        @Override
+        public void onDelete(String name) {
+            File file = new File(getConfPath() + name);
+            if (file.exists()) {
+                deleteDir(file);
+                logger.info("## Deleted and reloaded remote adapter config: {}", name);
+            }
         }
     }
 }

+ 31 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/RemoteAdapterMonitor.java

@@ -0,0 +1,31 @@
+package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
+
+/**
+ * 远程配置监听器接口
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public interface RemoteAdapterMonitor {
+
+    /**
+     * 新增配置事件
+     *
+     * @param configItem 配置项
+     */
+    void onAdd(ConfigItem configItem);
+
+    /**
+     * 修改配置事件
+     *
+     * @param configItem 配置项
+     */
+    void onModify(ConfigItem configItem);
+
+    /**
+     * 删除配置事件
+     *
+     * @param name 配置名
+     */
+    void onDelete(String name);
+}

+ 30 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/RemoteConfigLoader.java

@@ -0,0 +1,30 @@
+package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
+
+/**
+ * 远程配置装载器接口
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public interface RemoteConfigLoader {
+
+    /**
+     * 加载远程application.yml配置到本地
+     */
+    void loadRemoteConfig();
+
+    /**
+     * 加载adapter配置
+     */
+    void loadRemoteAdapterConfigs();
+
+    /**
+     * 启动监听数据库变化
+     */
+    void startMonitor();
+
+    /**
+     * 销毁
+     */
+    void destroy();
+}

+ 34 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/RemoteConfigLoaderFactory.java

@@ -0,0 +1,34 @@
+package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.env.Environment;
+
+/**
+ * 远程配置装载器工厂类
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public class RemoteConfigLoaderFactory {
+
+    private static final Logger logger = LoggerFactory.getLogger(RemoteConfigLoaderFactory.class);
+
+    public static RemoteConfigLoader getRemoteConfigLoader(Environment env) {
+        try {
+            String jdbcUrl = env.getProperty("canal.manager.jdbc.url");
+            if (!StringUtils.isEmpty(jdbcUrl)) {
+                // load remote config
+                String driverName = env.getProperty("canal.manager.jdbc.driverName");
+                String jdbcUsername = env.getProperty("canal.manager.jdbc.username");
+                String jdbcPassword = env.getProperty("canal.manager.jdbc.password");
+                return new DbRemoteConfigLoader(driverName, jdbcUrl, jdbcUsername, jdbcPassword);
+            }
+            // 可扩展其它远程配置加载器
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+        return null;
+    }
+}

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/ConfigLoader.java

@@ -21,7 +21,7 @@ public class ConfigLoader {
     private static Logger logger = LoggerFactory.getLogger(ConfigLoader.class);
 
     /**
-     * 加载HBase表映射配置
+     * 加载RDB表映射配置
      *
      * @return 配置名/配置文件名--对象
      */

+ 34 - 32
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -88,44 +88,46 @@ public class RdbSyncService {
      * @param function 回调方法
      */
     public void sync(List<Dml> dmls, Function<Dml, Boolean> function) {
-        boolean toExecute = false;
-        for (Dml dml : dmls) {
-            if (!toExecute) {
-                toExecute = function.apply(dml);
-            } else {
-                function.apply(dml);
+        try {
+            boolean toExecute = false;
+            for (Dml dml : dmls) {
+                if (!toExecute) {
+                    toExecute = function.apply(dml);
+                } else {
+                    function.apply(dml);
+                }
             }
-        }
-        if (toExecute) {
-            List<Future> futures = new ArrayList<>();
-            for (int i = 0; i < threads; i++) {
-                int j = i;
-                futures.add(executorThreads[i].submit(() -> {
+            if (toExecute) {
+                List<Future> futures = new ArrayList<>();
+                for (int i = 0; i < threads; i++) {
+                    int j = i;
+                    futures.add(executorThreads[i].submit(() -> {
+                        try {
+                            dmlsPartition[j]
+                                .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
+                            dmlsPartition[j].clear();
+                            batchExecutors[j].commit();
+                            return true;
+                        } catch (Throwable e) {
+                            batchExecutors[j].rollback();
+                            throw new RuntimeException(e);
+                        }
+                    }));
+                }
+
+                futures.forEach(future -> {
                     try {
-                        dmlsPartition[j]
-                            .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
-                        dmlsPartition[j].clear();
-                        batchExecutors[j].commit();
-                        return true;
-                    } catch (Throwable e) {
-                        batchExecutors[j].rollback();
+                        future.get();
+                    } catch (ExecutionException | InterruptedException e) {
                         throw new RuntimeException(e);
                     }
-                }));
+                });
             }
-
-            futures.forEach(future -> {
-                try {
-                    future.get();
-                } catch (ExecutionException | InterruptedException e) {
-                    throw new RuntimeException(e);
+        } finally {
+            for (BatchExecutor batchExecutor : batchExecutors) {
+                if (batchExecutor != null) {
+                    batchExecutor.close();
                 }
-            });
-        }
-
-        for (BatchExecutor batchExecutor : batchExecutors) {
-            if (batchExecutor != null) {
-                batchExecutor.close();
             }
         }
     }

+ 4 - 2
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -1,8 +1,10 @@
 package com.alibaba.otter.canal.client.kafka;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.MapMaker;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -31,7 +33,7 @@ import com.google.common.collect.Lists;
 public class KafkaCanalConnector implements CanalMQConnector {
 
     protected KafkaConsumer<String, Message> kafkaConsumer;
-    protected KafkaConsumer<String, String>  kafkaConsumer2;                  // 用于扁平message的数据消费
+    protected KafkaConsumer<String, String>  kafkaConsumer2;                            // 用于扁平message的数据消费
     protected String                         topic;
     protected Integer                        partition;
     protected Properties                     properties;
@@ -39,7 +41,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
     protected volatile boolean               running        = false;
     protected boolean                        flatMessage;
 
-    private Map<Integer, Long>               currentOffsets = new HashMap<>();
+    private Map<Integer, Long>               currentOffsets = new ConcurrentHashMap<>();
 
     public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId, Integer batchSize,
                                boolean flatMessage){

+ 4 - 4
common/src/main/java/com/alibaba/otter/canal/common/zookeeper/ZkClientx.java

@@ -16,7 +16,7 @@ import com.google.common.collect.MigrateMap;
 
 /**
  * 使用自定义的ZooKeeperx for zk connection
- * 
+ *
  * @author jianghang 2012-7-10 下午02:31:15
  * @version 1.0.0
  */
@@ -64,7 +64,7 @@ public class ZkClientx extends ZkClient {
 
     /**
      * Create a persistent Sequential node.
-     * 
+     *
      * @param path
      * @param createParents if true all parent dirs are created as well and no
      * {@link ZkNodeExistsException} is thrown in case the path already exists
@@ -92,7 +92,7 @@ public class ZkClientx extends ZkClient {
 
     /**
      * Create a persistent Sequential node.
-     * 
+     *
      * @param path
      * @param data
      * @param createParents if true all parent dirs are created as well and no
@@ -123,7 +123,7 @@ public class ZkClientx extends ZkClient {
 
     /**
      * Create a persistent Sequential node.
-     * 
+     *
      * @param path
      * @param data
      * @param createParents if true all parent dirs are created as well and no

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -48,6 +48,7 @@ public class CanalConstants {
     public static final String CANAL_MQ_FLATMESSAGE              = ROOT + "." + "mq.flatMessage";
     public static final String CANAL_MQ_COMPRESSION_TYPE         = ROOT + "." + "mq.compressionType";
     public static final String CANAL_MQ_ACKS                     = ROOT + "." + "mq.acks";
+    public static final String CANAL_MQ_TRANSACTION              = ROOT + "." + "mq.transaction";
     public static final String CANAL_ALIYUN_ACCESSKEY            = ROOT + "." + "aliyun.accessKey";
     public static final String CANAL_ALIYUN_SECRETKEY            = ROOT + "." + "aliyun.secretKey";
 

+ 80 - 65
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -140,80 +140,81 @@ public class CanalController {
 
         final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
         ServerRunningMonitors.setServerData(serverData);
-        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
-
-            public ServerRunningMonitor apply(final String destination) {
-                ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
-                runningMonitor.setDestination(destination);
-                runningMonitor.setListener(new ServerRunningListener() {
-
-                    public void processActiveEnter() {
-                        try {
-                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                            embededCanalServer.start(destination);
-                        } finally {
-                            MDC.remove(CanalConstants.MDC_DESTINATION);
+        ServerRunningMonitors
+            .setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
+
+                public ServerRunningMonitor apply(final String destination) {
+                    ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
+                    runningMonitor.setDestination(destination);
+                    runningMonitor.setListener(new ServerRunningListener() {
+
+                        public void processActiveEnter() {
+                            try {
+                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                                embededCanalServer.start(destination);
+                            } finally {
+                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                            }
                         }
-                    }
 
-                    public void processActiveExit() {
-                        try {
-                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                            embededCanalServer.stop(destination);
-                        } finally {
-                            MDC.remove(CanalConstants.MDC_DESTINATION);
+                        public void processActiveExit() {
+                            try {
+                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                                embededCanalServer.stop(destination);
+                            } finally {
+                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                            }
                         }
-                    }
-
-                    public void processStart() {
-                        try {
-                            if (zkclientx != null) {
-                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
-                                                                                                              + port);
-                                initCid(path);
-                                zkclientx.subscribeStateChanges(new IZkStateListener() {
-
-                                    public void handleStateChanged(KeeperState state) throws Exception {
-
-                                    }
 
-                                    public void handleNewSession() throws Exception {
-                                        initCid(path);
-                                    }
-
-                                    @Override
-                                    public void handleSessionEstablishmentError(Throwable error) throws Exception {
-                                        logger.error("failed to connect to zookeeper", error);
-                                    }
-                                });
+                        public void processStart() {
+                            try {
+                                if (zkclientx != null) {
+                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
+                                        ip + ":" + port);
+                                    initCid(path);
+                                    zkclientx.subscribeStateChanges(new IZkStateListener() {
+
+                                        public void handleStateChanged(KeeperState state) throws Exception {
+
+                                        }
+
+                                        public void handleNewSession() throws Exception {
+                                            initCid(path);
+                                        }
+
+                                        @Override
+                                        public void handleSessionEstablishmentError(Throwable error) throws Exception {
+                                            logger.error("failed to connect to zookeeper", error);
+                                        }
+                                    });
+                                }
+                            } finally {
+                                MDC.remove(CanalConstants.MDC_DESTINATION);
                             }
-                        } finally {
-                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
-                    }
 
-                    public void processStop() {
-                        try {
-                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                            if (zkclientx != null) {
-                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
-                                                                                                              + port);
-                                releaseCid(path);
+                        public void processStop() {
+                            try {
+                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                                if (zkclientx != null) {
+                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
+                                        ip + ":" + port);
+                                    releaseCid(path);
+                                }
+                            } finally {
+                                MDC.remove(CanalConstants.MDC_DESTINATION);
                             }
-                        } finally {
-                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
-                    }
 
-                });
-                if (zkclientx != null) {
-                    runningMonitor.setZkClient(zkclientx);
+                    });
+                    if (zkclientx != null) {
+                        runningMonitor.setZkClient(zkclientx);
+                    }
+                    // 触发创建一下cid节点
+                    runningMonitor.init();
+                    return runningMonitor;
                 }
-                // 触发创建一下cid节点
-                runningMonitor.init();
-                return runningMonitor;
-            }
-        }));
+            }));
 
         // 初始化monitor机制
         autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
@@ -265,7 +266,8 @@ public class CanalController {
             instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
 
                 public InstanceConfigMonitor apply(InstanceMode mode) {
-                    int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
+                    int scanInterval = Integer
+                        .valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
 
                     if (mode.isSpring()) {
                         SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
@@ -373,7 +375,8 @@ public class CanalController {
             InstanceConfig oldConfig = instanceConfigs.put(destination, config);
 
             if (oldConfig != null) {
-                logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
+                logger
+                    .warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
             }
         }
     }
@@ -478,6 +481,7 @@ public class CanalController {
     }
 
     public void stop() throws Throwable {
+
         if (canalServer != null) {
             canalServer.stop();
         }
@@ -504,6 +508,17 @@ public class CanalController {
             zkclientx.close();
         }
 
+        //关闭时清理缓存
+        if (instanceConfigs != null) {
+            instanceConfigs.clear();
+        }
+        if (managerClients != null) {
+            managerClients.clear();
+        }
+        if (instanceConfigMonitors != null) {
+            instanceConfigMonitors.clear();
+        }
+
         ZkClientx.clearClients();
     }
 

+ 13 - 18
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -7,7 +7,9 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.deployer.monitor.ManagerRemoteConfigMonitor;
+import com.alibaba.otter.canal.deployer.monitor.remote.RemoteConfigLoader;
+import com.alibaba.otter.canal.deployer.monitor.remote.RemoteConfigLoaderFactory;
+import com.alibaba.otter.canal.deployer.monitor.remote.RemoteCanalConfigMonitor;
 
 /**
  * canal独立版本启动的入口类
@@ -30,7 +32,7 @@ public class CanalLauncher {
             logger.info("## load canal configurations");
             String conf = System.getProperty("canal.conf", "classpath:canal.properties");
             Properties properties = new Properties();
-            ManagerRemoteConfigMonitor managerDbConfigMonitor = null;
+            RemoteConfigLoader remoteConfigLoader = null;
             if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                 conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                 properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
@@ -38,31 +40,24 @@ public class CanalLauncher {
                 properties.load(new FileInputStream(conf));
             }
 
-            String jdbcUrl = properties.getProperty("canal.manager.jdbc.url");
-            if (!StringUtils.isEmpty(jdbcUrl)) {
-                logger.info("## load remote canal configurations");
-                // load remote config
-                String jdbcUsername = properties.getProperty("canal.manager.jdbc.username");
-                String jdbcPassword = properties.getProperty("canal.manager.jdbc.password");
-                managerDbConfigMonitor = new ManagerRemoteConfigMonitor(jdbcUrl, jdbcUsername, jdbcPassword);
+            remoteConfigLoader = RemoteConfigLoaderFactory.getRemoteConfigLoader(properties);
+            if (remoteConfigLoader != null) {
                 // 加载远程canal.properties
-                Properties remoteConfig = managerDbConfigMonitor.loadRemoteConfig();
+                Properties remoteConfig = remoteConfigLoader.loadRemoteConfig();
                 // 加载remote instance配置
-                managerDbConfigMonitor.loadRemoteInstanceConfigs();
+                remoteConfigLoader.loadRemoteInstanceConfigs();
                 if (remoteConfig != null) {
                     properties = remoteConfig;
                 } else {
-                    managerDbConfigMonitor = null;
+                    remoteConfigLoader = null;
                 }
-            } else {
-                logger.info("## load canal configurations");
             }
 
             final CanalStater canalStater = new CanalStater();
             canalStater.start(properties);
 
-            if (managerDbConfigMonitor != null) {
-                managerDbConfigMonitor.start(new ManagerRemoteConfigMonitor.Listener<Properties>() {
+            if (remoteConfigLoader != null) {
+                remoteConfigLoader.startMonitor(new RemoteCanalConfigMonitor() {
 
                     @Override
                     public void onChange(Properties properties) {
@@ -81,8 +76,8 @@ public class CanalLauncher {
                 Thread.sleep(1000);
             }
 
-            if (managerDbConfigMonitor != null) {
-                managerDbConfigMonitor.destroy();
+            if (remoteConfigLoader != null) {
+                remoteConfigLoader.destroy();
             }
         } catch (Throwable e) {
             logger.error("## Something goes wrong when starting up the canal Server:", e);

+ 4 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -150,7 +150,6 @@ public class CanalStater {
         if (!StringUtils.isEmpty(acks)) {
             mqProperties.setAcks(acks);
         }
-
         String aliyunAccessKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESSKEY);
         if (!StringUtils.isEmpty(aliyunAccessKey)) {
             mqProperties.setAliyunAccessKey(aliyunAccessKey);
@@ -159,6 +158,10 @@ public class CanalStater {
         if (!StringUtils.isEmpty(aliyunSecretKey)) {
             mqProperties.setAliyunSecretKey(aliyunSecretKey);
         }
+        String transaction = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_TRANSACTION);
+        if (!StringUtils.isEmpty(transaction)) {
+            mqProperties.setTransaction(Boolean.valueOf(transaction));
+        }
         return mqProperties;
     }
 

+ 47 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/ConfigItem.java

@@ -0,0 +1,47 @@
+package com.alibaba.otter.canal.deployer.monitor.remote;
+
+/**
+ * 配置对应对象
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public class ConfigItem {
+
+    private Long   id;
+    private String name;
+    private String content;
+    private long   modifiedTime;
+
+    public Long getId() {
+        return id;
+    }
+
+    public void setId(Long id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    public void setContent(String content) {
+        this.content = content;
+    }
+
+    public long getModifiedTime() {
+        return modifiedTime;
+    }
+
+    public void setModifiedTime(long modifiedTime) {
+        this.modifiedTime = modifiedTime;
+    }
+}

+ 95 - 147
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/ManagerRemoteConfigMonitor.java → deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/DbRemoteConfigLoader.java

@@ -1,39 +1,41 @@
-package com.alibaba.otter.canal.deployer.monitor;
+package com.alibaba.otter.canal.deployer.monitor.remote;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.nio.charset.StandardCharsets;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.*;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
 import com.alibaba.otter.canal.deployer.CanalConstants;
 import com.google.common.base.Joiner;
 import com.google.common.collect.MapMaker;
 
 /**
- * 远程配置装载监控
+ * 基于数据库的远程配置装载器
  *
- * @author rewerma 2018-12-30 下午05:12:16
- * @version 1.0.1
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
  */
-public class ManagerRemoteConfigMonitor {
+public class DbRemoteConfigLoader implements RemoteConfigLoader {
 
-    private static final Logger      logger                 = LoggerFactory.getLogger(ManagerRemoteConfigMonitor.class);
+    private static final Logger      logger                 = LoggerFactory.getLogger(DbRemoteConfigLoader.class);
 
     private Map<String, ConfigItem>  remoteInstanceConfigs  = new MapMaker().makeMap();
 
-    private Connection               conn;
-    private String                   jdbcUrl;
-    private String                   jdbcUsername;
-    private String                   jdbcPassword;
+    private DruidDataSource          dataSource;
 
     private long                     currentConfigTimestamp = 0;
 
@@ -41,29 +43,36 @@ public class ManagerRemoteConfigMonitor {
     private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
         new NamedThreadFactory("remote-canal-config-scan"));
 
-    public ManagerRemoteConfigMonitor(String jdbcUrl, String jdbcUsername, String jdbcPassword){
-        this.jdbcUrl = jdbcUrl;
-        this.jdbcUsername = jdbcUsername;
-        this.jdbcPassword = jdbcPassword;
-    }
-
-    public void setScanIntervalInSecond(long scanIntervalInSecond) {
-        this.scanIntervalInSecond = scanIntervalInSecond;
-    }
+    private RemoteInstanceMonitor    remoteInstanceMonitor  = new RemoteInstanceMonitorImpl();
 
-    private Connection getConn() throws Exception {
-        if (conn == null || conn.isClosed()) {
-            Class.forName("com.mysql.jdbc.Driver");
-            conn = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword);
+    public DbRemoteConfigLoader(String driverName, String jdbcUrl, String jdbcUsername, String jdbcPassword){
+        dataSource = new DruidDataSource();
+        if (StringUtils.isEmpty(driverName)) {
+            driverName = "com.mysql.jdbc.Driver";
+        }
+        dataSource.setDriverClassName(driverName);
+        dataSource.setUrl(jdbcUrl);
+        dataSource.setUsername(jdbcUsername);
+        dataSource.setPassword(jdbcPassword);
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(1);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            throw new RuntimeException(e.getMessage(), e);
         }
-        return conn;
     }
 
     /**
-     * 加载远程 canal.properties文件 覆盖本地
+     * 加载远程 canal.properties文件
      *
      * @return 远程配置的properties
      */
+    @Override
     public Properties loadRemoteConfig() {
         Properties properties = null;
         try {
@@ -87,15 +96,14 @@ public class ManagerRemoteConfigMonitor {
     }
 
     /**
-     * 加载远程的instance配置 覆盖本地
+     * 覆盖本地 canal.properties
+     *
+     * @param content 远程配置内容文本
      */
-    public void loadRemoteInstanceConfigs() {
-        try {
-            // 加载远程instance配置
-            Map<String, ConfigItem>[] modifiedConfigs = getModifiedInstanceConfigs();
-            if (modifiedConfigs != null) {
-                overrideLocalInstanceConfigs(modifiedConfigs);
-            }
+    private void overrideLocalCanalConfig(String content) {
+        try (FileWriter writer = new FileWriter(getConfPath() + "canal.properties")) {
+            writer.write(content);
+            writer.flush();
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
@@ -108,7 +116,9 @@ public class ManagerRemoteConfigMonitor {
      */
     private ConfigItem getRemoteCanalConfig() {
         String sql = "select name, content, modified_time from canal_config where id=1";
-        try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
+        try (Connection conn = dataSource.getConnection();
+                Statement stmt = conn.createStatement();
+                ResultSet rs = stmt.executeQuery(sql)) {
             if (rs.next()) {
                 ConfigItem configItem = new ConfigItem();
                 configItem.setId(1L);
@@ -124,30 +134,27 @@ public class ManagerRemoteConfigMonitor {
     }
 
     /**
-     * 覆盖本地 canal.properties
-     *
-     * @param content 远程配置内容文本
+     * 加载远程的instance配置
      */
-    private void overrideLocalCanalConfig(String content) {
-        try (FileWriter writer = new FileWriter(getConfPath() + "canal.properties")) {
-            writer.write(content);
-            writer.flush();
+    @Override
+    public void loadRemoteInstanceConfigs() {
+        try {
+            // 加载远程instance配置
+            loadModifiedInstanceConfigs();
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
-     * 获取远程instance新增、修改、删除配置
-     *
-     * @return Map[0]:新增或修改的instance配置; Map[1]:删除的instance配置
+     * 加载远程instance新增、修改、删除配置
      */
-    @SuppressWarnings("unchecked")
-    private Map<String, ConfigItem>[] getModifiedInstanceConfigs() {
-        Map<String, ConfigItem>[] res = new Map[2];
+    private void loadModifiedInstanceConfigs() {
         Map<String, ConfigItem> remoteConfigStatus = new HashMap<>();
         String sql = "select id, name, modified_time from canal_instance_config";
-        try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
+        try (Connection conn = dataSource.getConnection();
+                Statement stmt = conn.createStatement();
+                ResultSet rs = stmt.executeQuery(sql)) {
             while (rs.next()) {
                 ConfigItem configItem = new ConfigItem();
                 configItem.setId(rs.getLong("id"));
@@ -157,7 +164,6 @@ public class ManagerRemoteConfigMonitor {
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
-            return null;
         }
 
         if (!remoteConfigStatus.isEmpty()) {
@@ -176,10 +182,11 @@ public class ManagerRemoteConfigMonitor {
                 }
             }
             if (!changedIds.isEmpty()) {
-                Map<String, ConfigItem> changedInstanceConfig = new HashMap<>();
                 String contentsSql = "select id, name, content, modified_time from canal_instance_config  where id in ("
                                      + Joiner.on(",").join(changedIds) + ")";
-                try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(contentsSql)) {
+                try (Connection conn = dataSource.getConnection();
+                        Statement stmt = conn.createStatement();
+                        ResultSet rs = stmt.executeQuery(contentsSql)) {
                     while (rs.next()) {
                         ConfigItem configItemNew = new ConfigItem();
                         configItemNew.setId(rs.getLong("id"));
@@ -188,64 +195,20 @@ public class ManagerRemoteConfigMonitor {
                         configItemNew.setModifiedTime(rs.getTimestamp("modified_time").getTime());
 
                         remoteInstanceConfigs.put(configItemNew.getName(), configItemNew);
-                        changedInstanceConfig.put(configItemNew.getName(), configItemNew);
+                        remoteInstanceMonitor.onModify(configItemNew);
                     }
 
-                    res[0] = changedInstanceConfig.isEmpty() ? null : changedInstanceConfig;
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                 }
             }
         }
 
-        Map<String, ConfigItem> removedInstanceConfig = new HashMap<>();
         for (String name : remoteInstanceConfigs.keySet()) {
             if (!remoteConfigStatus.containsKey(name)) {
                 // 删除
                 remoteInstanceConfigs.remove(name);
-                removedInstanceConfig.put(name, null);
-            }
-        }
-        res[1] = removedInstanceConfig.isEmpty() ? null : removedInstanceConfig;
-
-        if (res[0] == null && res[1] == null) {
-            return null;
-        } else {
-            return res;
-        }
-    }
-
-    /**
-     * 覆盖本地instance配置
-     *
-     * @param modifiedInstanceConfigs 有变更的配置项
-     */
-    private void overrideLocalInstanceConfigs(Map<String, ConfigItem>[] modifiedInstanceConfigs) {
-        Map<String, ConfigItem> changedInstanceConfigs = modifiedInstanceConfigs[0];
-        if (changedInstanceConfigs != null) {
-            for (ConfigItem configItem : changedInstanceConfigs.values()) {
-                File instanceDir = new File(getConfPath() + configItem.getName());
-                if (!instanceDir.exists()) {
-                    instanceDir.mkdirs();
-                }
-                try (FileWriter writer = new FileWriter(
-                    getConfPath() + configItem.getName() + "/instance.properties")) {
-                    writer.write(configItem.getContent());
-                    writer.flush();
-                    logger.info("## Loaded remote instance config: {}/instance.properties ", configItem.getName());
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-        }
-        Map<String, ConfigItem> removedInstanceConfigs = modifiedInstanceConfigs[1];
-        if (removedInstanceConfigs != null) {
-            for (String name : removedInstanceConfigs.keySet()) {
-                File file = new File(getConfPath() + name + "/");
-                if (file.exists()) {
-                    deleteDir(file);
-                    logger.info("## Deleted and loaded remote instance config: {} ", name);
-                }
+                remoteInstanceMonitor.onDelete(name);
             }
         }
     }
@@ -273,9 +236,9 @@ public class ManagerRemoteConfigMonitor {
     /**
      * 监听 canal 主配置和 instance 配置变化
      *
-     * @param listener 监听回调方法
+     * @param remoteCanalConfigMonitor 监听回调方法
      */
-    public void start(final Listener<Properties> listener) {
+    public void startMonitor(final RemoteCanalConfigMonitor remoteCanalConfigMonitor) {
         // 监听canal.properties变化
         executor.scheduleWithFixedDelay(new Runnable() {
 
@@ -283,10 +246,10 @@ public class ManagerRemoteConfigMonitor {
                 try {
                     Properties properties = loadRemoteConfig();
                     if (properties != null) {
-                        listener.onChange(properties);
+                        remoteCanalConfigMonitor.onChange(properties);
                     }
                 } catch (Throwable e) {
-                    logger.error("scan failed", e);
+                    logger.error("Scan remote canal config failed", e);
                 }
             }
 
@@ -299,7 +262,7 @@ public class ManagerRemoteConfigMonitor {
                 try {
                     loadRemoteInstanceConfigs();
                 } catch (Throwable e) {
-                    logger.error("scan failed", e);
+                    logger.error("Scan remote instance config failed", e);
                 }
             }
 
@@ -311,12 +274,10 @@ public class ManagerRemoteConfigMonitor {
      */
     public void destroy() {
         executor.shutdownNow();
-        if (conn != null) {
-            try {
-                conn.close();
-            } catch (SQLException e) {
-                logger.error(e.getMessage(), e);
-            }
+        try {
+            dataSource.close();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
     }
 
@@ -336,50 +297,37 @@ public class ManagerRemoteConfigMonitor {
     }
 
     /**
-     * 配置对应对象
+     * 远程xxx/instance.properties配置监听器实现
      */
-    public static class ConfigItem {
-
-        private Long   id;
-        private String name;
-        private String content;
-        private long   modifiedTime;
-
-        public Long getId() {
-            return id;
-        }
-
-        public void setId(Long id) {
-            this.id = id;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public void setName(String name) {
-            this.name = name;
-        }
-
-        public String getContent() {
-            return content;
-        }
+    private class RemoteInstanceMonitorImpl implements RemoteInstanceMonitor {
 
-        public void setContent(String content) {
-            this.content = content;
+        @Override
+        public void onAdd(ConfigItem configItem) {
+            this.onModify(configItem);
         }
 
-        public long getModifiedTime() {
-            return modifiedTime;
+        @Override
+        public void onModify(ConfigItem configItem) {
+            File instanceDir = new File(getConfPath() + configItem.getName());
+            if (!instanceDir.exists()) {
+                instanceDir.mkdirs();
+            }
+            try (FileWriter writer = new FileWriter(getConfPath() + configItem.getName() + "/instance.properties")) {
+                writer.write(configItem.getContent());
+                writer.flush();
+                logger.info("## Loaded remote instance config: {}/instance.properties ", configItem.getName());
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
         }
 
-        public void setModifiedTime(long modifiedTime) {
-            this.modifiedTime = modifiedTime;
+        @Override
+        public void onDelete(String instanceName) {
+            File file = new File(getConfPath() + instanceName + "/");
+            if (file.exists()) {
+                deleteDir(file);
+                logger.info("## Deleted and loaded remote instance config: {} ", instanceName);
+            }
         }
     }
-
-    public interface Listener<T> {
-
-        void onChange(T properties);
-    }
 }

+ 13 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/RemoteCanalConfigMonitor.java

@@ -0,0 +1,13 @@
+package com.alibaba.otter.canal.deployer.monitor.remote;
+
+import java.util.Properties;
+
+/**
+ * 远程canal.properties配置监听器接口
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public interface RemoteCanalConfigMonitor {
+    void onChange(Properties properties);
+}

+ 36 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/RemoteConfigLoader.java

@@ -0,0 +1,36 @@
+package com.alibaba.otter.canal.deployer.monitor.remote;
+
+import java.util.Properties;
+
+/**
+ * 远程配置装载器接口
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public interface RemoteConfigLoader {
+
+    /**
+     * 加载远程 canal.properties文件
+     *
+     * @return 远程配置的properties
+     */
+    Properties loadRemoteConfig();
+
+    /**
+     * 加载远程的instance配置
+     */
+    void loadRemoteInstanceConfigs();
+
+    /**
+     * 启动监听 canal 主配置和 instance 配置变化
+     *
+     * @param remoteCanalConfigMonitor 监听回调方法
+     */
+    void startMonitor(final RemoteCanalConfigMonitor remoteCanalConfigMonitor);
+
+    /**
+     * 销毁
+     */
+    void destroy();
+}

+ 35 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/RemoteConfigLoaderFactory.java

@@ -0,0 +1,35 @@
+package com.alibaba.otter.canal.deployer.monitor.remote;
+
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 远程配置装载器工厂类
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public class RemoteConfigLoaderFactory {
+
+    private static final Logger logger = LoggerFactory.getLogger(RemoteConfigLoaderFactory.class);
+
+    public static RemoteConfigLoader getRemoteConfigLoader(Properties localProperties) {
+        String jdbcUrl = localProperties.getProperty("canal.manager.jdbc.url");
+        if (!StringUtils.isEmpty(jdbcUrl)) {
+            logger.info("## load remote canal configurations");
+            // load remote config
+            String driverName = localProperties.getProperty("canal.manager.jdbc.driverName");
+            String jdbcUsername = localProperties.getProperty("canal.manager.jdbc.username");
+            String jdbcPassword = localProperties.getProperty("canal.manager.jdbc.password");
+            return new DbRemoteConfigLoader(driverName, jdbcUrl, jdbcUsername, jdbcPassword);
+        }
+        // 可扩展其它远程配置加载器
+
+        logger.info("## load local canal configurations");
+
+        return null;
+    }
+}

+ 31 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/RemoteInstanceMonitor.java

@@ -0,0 +1,31 @@
+package com.alibaba.otter.canal.deployer.monitor.remote;
+
+/**
+ * 远程xxx/instance.properties配置监听器接口
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public interface RemoteInstanceMonitor {
+
+    /**
+     * 新增配置事件
+     *
+     * @param configItem 配置项
+     */
+    void onAdd(ConfigItem configItem);
+
+    /**
+     * 修改配置事件
+     *
+     * @param configItem 配置项
+     */
+    void onModify(ConfigItem configItem);
+
+    /**
+     * 删除配置事件
+     *
+     * @param instanceName 实例名
+     */
+    void onDelete(String instanceName);
+}

+ 3 - 1
deployer/src/main/resources/canal.properties

@@ -114,4 +114,6 @@ canal.mq.canalBatchSize = 50
 canal.mq.canalGetTimeout = 100
 canal.mq.flatMessage = true
 canal.mq.compressionType = none
-canal.mq.acks = all
+canal.mq.acks = all
+# use transaction for kafka flatMessage batch produce
+canal.mq.transaction = false

+ 334 - 329
meta/src/main/java/com/alibaba/otter/canal/meta/ZooKeeperMetaManager.java

@@ -1,329 +1,334 @@
-package com.alibaba.otter.canal.meta;
-
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.commons.lang.StringUtils;
-import org.springframework.util.Assert;
-import org.springframework.util.CollectionUtils;
-
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
-import com.alibaba.otter.canal.common.utils.JsonUtils;
-import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
-import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
-import com.alibaba.otter.canal.meta.exception.CanalMetaManagerException;
-import com.alibaba.otter.canal.protocol.ClientIdentity;
-import com.alibaba.otter.canal.protocol.position.Position;
-import com.alibaba.otter.canal.protocol.position.PositionRange;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * zk 版本的 canal manager, 存储结构:
- * 
- * <pre>
- * /otter
- *    canal
- *      destinations
- *        dest1 
- *          client1
- *            filter
- *            batch_mark
- *              1
- *              2
- *              3
- * </pre>
- * 
- * @author zebin.xuzb @ 2012-6-21
- * @author jianghang
- * @version 1.0.0
- */
-public class ZooKeeperMetaManager extends AbstractCanalLifeCycle implements CanalMetaManager {
-
-    private static final String ENCODE = "UTF-8";
-    private ZkClientx           zkClientx;
-
-    public void start() {
-        super.start();
-
-        Assert.notNull(zkClientx);
-    }
-
-    public void stop() {
-        super.stop();
-    }
-
-    public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
-            clientIdentity.getClientId());
-
-        try {
-            zkClientx.createPersistent(path, true);
-        } catch (ZkNodeExistsException e) {
-            // ignore
-        }
-        if (clientIdentity.hasFilter()) {
-            String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
-                clientIdentity.getClientId());
-
-            byte[] bytes = null;
-            try {
-                bytes = clientIdentity.getFilter().getBytes(ENCODE);
-            } catch (UnsupportedEncodingException e) {
-                throw new CanalMetaManagerException(e);
-            }
-
-            try {
-                zkClientx.createPersistent(filterPath, bytes);
-            } catch (ZkNodeExistsException e) {
-                // ignore
-                zkClientx.writeData(filterPath, bytes);
-            }
-        }
-    }
-
-    public boolean hasSubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
-            clientIdentity.getClientId());
-        return zkClientx.exists(path);
-    }
-
-    public void unsubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
-            clientIdentity.getClientId());
-        zkClientx.deleteRecursive(path); // 递归删除所有信息
-    }
-
-    public List<ClientIdentity> listAllSubscribeInfo(String destination) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getDestinationPath(destination);
-        List<String> childs = null;
-        try {
-            childs = zkClientx.getChildren(path);
-        } catch (ZkNoNodeException e) {
-            // ignore
-        }
-
-        if (CollectionUtils.isEmpty(childs)) {
-            return new ArrayList<ClientIdentity>();
-        }
-        List<Short> clientIds = new ArrayList<Short>();
-        for (String child : childs) {
-            if (StringUtils.isNumeric(child)) {
-                clientIds.add(ZookeeperPathUtils.getClientId(child));
-            }
-        }
-
-        Collections.sort(clientIds); // 进行一个排序
-        List<ClientIdentity> clientIdentities = Lists.newArrayList();
-        for (Short clientId : clientIds) {
-            path = ZookeeperPathUtils.getFilterPath(destination, clientId);
-            byte[] bytes = zkClientx.readData(path, true);
-            String filter = null;
-            if (bytes != null) {
-                try {
-                    filter = new String(bytes, ENCODE);
-                } catch (UnsupportedEncodingException e) {
-                    throw new CanalMetaManagerException(e);
-                }
-            }
-            clientIdentities.add(new ClientIdentity(destination, clientId, filter));
-        }
-
-        return clientIdentities;
-    }
-
-    public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-
-        byte[] data = zkClientx.readData(path, true);
-        if (data == null || data.length == 0) {
-            return null;
-        }
-
-        return JsonUtils.unmarshalFromByte(data, Position.class);
-    }
-
-    public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-        byte[] data = JsonUtils.marshalToByte(position, SerializerFeature.WriteClassName);
-        try {
-            zkClientx.writeData(path, data);
-        } catch (ZkNoNodeException e) {
-            zkClientx.createPersistent(path, data, true);// 第一次节点不存在,则尝试重建
-        }
-    }
-
-    public Long addBatch(ClientIdentity clientIdentity, PositionRange positionRange) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-        byte[] data = JsonUtils.marshalToByte(positionRange, SerializerFeature.WriteClassName);
-        String batchPath = zkClientx.createPersistentSequential(path + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR,
-            data,
-            true);
-        String batchIdString = StringUtils.substringAfterLast(batchPath, ZookeeperPathUtils.ZOOKEEPER_SEPARATOR);
-        return ZookeeperPathUtils.getBatchMarkId(batchIdString);
-    }
-
-    public void addBatch(ClientIdentity clientIdentity, PositionRange positionRange, Long batchId)
-                                                                                                  throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getBatchMarkWithIdPath(clientIdentity.getDestination(),
-            clientIdentity.getClientId(),
-            batchId);
-        byte[] data = JsonUtils.marshalToByte(positionRange, SerializerFeature.WriteClassName);
-        zkClientx.createPersistent(path, data, true);
-    }
-
-    public PositionRange removeBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
-        String batchsPath = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
-            clientIdentity.getClientId());
-        List<String> nodes = zkClientx.getChildren(batchsPath);
-        if (CollectionUtils.isEmpty(nodes)) {
-            // 没有batch记录
-            return null;
-        }
-
-        // 找到最小的Id
-        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
-        for (String batchIdString : nodes) {
-            batchIds.add(Long.valueOf(batchIdString));
-        }
-        Long minBatchId = Collections.min(batchIds);
-        if (!minBatchId.equals(batchId)) {
-            // 检查一下提交的ack/rollback,必须按batchId分出去的顺序提交,否则容易出现丢数据
-            throw new CanalMetaManagerException(String.format("batchId:%d is not the firstly:%d", batchId, minBatchId));
-        }
-
-        if (!batchIds.contains(batchId)) {
-            // 不存在对应的batchId
-            return null;
-        }
-        PositionRange positionRange = getBatch(clientIdentity, batchId);
-        if (positionRange != null) {
-            String path = ZookeeperPathUtils.getBatchMarkWithIdPath(clientIdentity.getDestination(),
-                clientIdentity.getClientId(),
-                batchId);
-            zkClientx.delete(path);
-        }
-
-        return positionRange;
-    }
-
-    public PositionRange getBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getBatchMarkWithIdPath(clientIdentity.getDestination(),
-            clientIdentity.getClientId(),
-            batchId);
-        byte[] data = zkClientx.readData(path, true);
-        if (data == null) {
-            return null;
-        }
-
-        PositionRange positionRange = JsonUtils.unmarshalFromByte(data, PositionRange.class);
-        return positionRange;
-    }
-
-    public void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-        List<String> batchChilds = zkClientx.getChildren(path);
-
-        for (String batchChild : batchChilds) {
-            String batchPath = path + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + batchChild;
-            zkClientx.delete(batchPath);
-        }
-    }
-
-    public PositionRange getLastestBatch(ClientIdentity clientIdentity) {
-        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-        List<String> nodes = null;
-        try {
-            nodes = zkClientx.getChildren(path);
-        } catch (ZkNoNodeException e) {
-            // ignore
-        }
-
-        if (CollectionUtils.isEmpty(nodes)) {
-            return null;
-        }
-        // 找到最大的Id
-        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
-        for (String batchIdString : nodes) {
-            batchIds.add(Long.valueOf(batchIdString));
-        }
-        Long maxBatchId = Collections.max(batchIds);
-        PositionRange result = getBatch(clientIdentity, maxBatchId);
-        if (result == null) { // 出现为null,说明zk节点有变化,重新获取
-            return getLastestBatch(clientIdentity);
-        } else {
-            return result;
-        }
-    }
-
-    public PositionRange getFirstBatch(ClientIdentity clientIdentity) {
-        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-        List<String> nodes = null;
-        try {
-            nodes = zkClientx.getChildren(path);
-        } catch (ZkNoNodeException e) {
-            // ignore
-        }
-
-        if (CollectionUtils.isEmpty(nodes)) {
-            return null;
-        }
-        // 找到最小的Id
-        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
-        for (String batchIdString : nodes) {
-            batchIds.add(Long.valueOf(batchIdString));
-        }
-        Long minBatchId = Collections.min(batchIds);
-        PositionRange result = getBatch(clientIdentity, minBatchId);
-        if (result == null) { // 出现为null,说明zk节点有变化,重新获取
-            return getFirstBatch(clientIdentity);
-        } else {
-            return result;
-        }
-    }
-
-    public Map<Long, PositionRange> listAllBatchs(ClientIdentity clientIdentity) {
-        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-        List<String> nodes = null;
-        try {
-            nodes = zkClientx.getChildren(path);
-        } catch (ZkNoNodeException e) {
-            // ignore
-        }
-
-        if (CollectionUtils.isEmpty(nodes)) {
-            return Maps.newHashMap();
-        }
-        // 找到最大的Id
-        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
-        for (String batchIdString : nodes) {
-            batchIds.add(Long.valueOf(batchIdString));
-        }
-
-        Collections.sort(batchIds); // 从小到大排序
-        Map<Long, PositionRange> positionRanges = Maps.newLinkedHashMap();
-        for (Long batchId : batchIds) {
-            PositionRange result = getBatch(clientIdentity, batchId);
-            if (result == null) {// 出现为null,说明zk节点有变化,重新获取
-                return listAllBatchs(clientIdentity);
-            } else {
-                positionRanges.put(batchId, result);
-            }
-        }
-
-        return positionRanges;
-    }
-
-    // =========== setter ==========
-
-    public void setZkClientx(ZkClientx zkClientx) {
-        this.zkClientx = zkClientx;
-    }
-
-}
+package com.alibaba.otter.canal.meta;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
+
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
+import com.alibaba.otter.canal.common.utils.JsonUtils;
+import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
+import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
+import com.alibaba.otter.canal.meta.exception.CanalMetaManagerException;
+import com.alibaba.otter.canal.protocol.ClientIdentity;
+import com.alibaba.otter.canal.protocol.position.Position;
+import com.alibaba.otter.canal.protocol.position.PositionRange;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * zk 版本的 canal manager, 存储结构:
+ *
+ * <pre>
+ * /otter
+ *    canal
+ *      destinations
+ *        dest1
+ *          client1
+ *            filter
+ *            batch_mark
+ *              1
+ *              2
+ *              3
+ * </pre>
+ *
+ * @author zebin.xuzb @ 2012-6-21
+ * @author jianghang
+ * @version 1.0.0
+ */
+public class ZooKeeperMetaManager extends AbstractCanalLifeCycle implements CanalMetaManager {
+
+    private static final String ENCODE = "UTF-8";
+    private ZkClientx           zkClientx;
+
+    public void start() {
+        super.start();
+
+        Assert.notNull(zkClientx);
+    }
+
+    public void stop() {
+        zkClientx = null; //关闭时置空
+        super.stop();
+    }
+
+    public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+
+        try {
+            zkClientx.createPersistent(path, true);
+        } catch (ZkNodeExistsException e) {
+            // ignore
+        }
+        if (clientIdentity.hasFilter()) {
+            String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
+                clientIdentity.getClientId());
+
+            byte[] bytes = null;
+            try {
+                bytes = clientIdentity.getFilter().getBytes(ENCODE);
+            } catch (UnsupportedEncodingException e) {
+                throw new CanalMetaManagerException(e);
+            }
+
+            try {
+                zkClientx.createPersistent(filterPath, bytes);
+            } catch (ZkNodeExistsException e) {
+                // ignore
+                zkClientx.writeData(filterPath, bytes);
+            }
+        }
+    }
+
+    public boolean hasSubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        return zkClientx.exists(path);
+    }
+
+    public void unsubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        zkClientx.deleteRecursive(path); // 递归删除所有信息
+    }
+
+    public List<ClientIdentity> listAllSubscribeInfo(String destination) throws CanalMetaManagerException {
+        if (zkClientx == null) { //重新加载时可能为空
+            return new ArrayList<ClientIdentity>();
+        }
+        String path = ZookeeperPathUtils.getDestinationPath(destination);
+        List<String> childs = null;
+        try {
+            childs = zkClientx.getChildren(path);
+        } catch (ZkNoNodeException e) {
+            // ignore
+        }
+
+        if (CollectionUtils.isEmpty(childs)) {
+            return new ArrayList<ClientIdentity>();
+        }
+        List<Short> clientIds = new ArrayList<Short>();
+        for (String child : childs) {
+            if (StringUtils.isNumeric(child)) {
+                clientIds.add(ZookeeperPathUtils.getClientId(child));
+            }
+        }
+
+        Collections.sort(clientIds); // 进行一个排序
+        List<ClientIdentity> clientIdentities = Lists.newArrayList();
+        for (Short clientId : clientIds) {
+            path = ZookeeperPathUtils.getFilterPath(destination, clientId);
+            byte[] bytes = zkClientx.readData(path, true);
+            String filter = null;
+            if (bytes != null) {
+                try {
+                    filter = new String(bytes, ENCODE);
+                } catch (UnsupportedEncodingException e) {
+                    throw new CanalMetaManagerException(e);
+                }
+            }
+            clientIdentities.add(new ClientIdentity(destination, clientId, filter));
+        }
+
+        return clientIdentities;
+    }
+
+    public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
+
+        byte[] data = zkClientx.readData(path, true);
+        if (data == null || data.length == 0) {
+            return null;
+        }
+
+        return JsonUtils.unmarshalFromByte(data, Position.class);
+    }
+
+    public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
+        byte[] data = JsonUtils.marshalToByte(position, SerializerFeature.WriteClassName);
+        try {
+            zkClientx.writeData(path, data);
+        } catch (ZkNoNodeException e) {
+            zkClientx.createPersistent(path, data, true);// 第一次节点不存在,则尝试重建
+        }
+    }
+
+    public Long addBatch(ClientIdentity clientIdentity, PositionRange positionRange) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        byte[] data = JsonUtils.marshalToByte(positionRange, SerializerFeature.WriteClassName);
+        String batchPath = zkClientx
+            .createPersistentSequential(path + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR, data, true);
+        String batchIdString = StringUtils.substringAfterLast(batchPath, ZookeeperPathUtils.ZOOKEEPER_SEPARATOR);
+        return ZookeeperPathUtils.getBatchMarkId(batchIdString);
+    }
+
+    public void addBatch(ClientIdentity clientIdentity, PositionRange positionRange,
+                         Long batchId) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils
+            .getBatchMarkWithIdPath(clientIdentity.getDestination(), clientIdentity.getClientId(), batchId);
+        byte[] data = JsonUtils.marshalToByte(positionRange, SerializerFeature.WriteClassName);
+        zkClientx.createPersistent(path, data, true);
+    }
+
+    public PositionRange removeBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
+        String batchsPath = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        List<String> nodes = zkClientx.getChildren(batchsPath);
+        if (CollectionUtils.isEmpty(nodes)) {
+            // 没有batch记录
+            return null;
+        }
+
+        // 找到最小的Id
+        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
+        for (String batchIdString : nodes) {
+            batchIds.add(Long.valueOf(batchIdString));
+        }
+        Long minBatchId = Collections.min(batchIds);
+        if (!minBatchId.equals(batchId)) {
+            // 检查一下提交的ack/rollback,必须按batchId分出去的顺序提交,否则容易出现丢数据
+            throw new CanalMetaManagerException(String.format("batchId:%d is not the firstly:%d", batchId, minBatchId));
+        }
+
+        if (!batchIds.contains(batchId)) {
+            // 不存在对应的batchId
+            return null;
+        }
+        PositionRange positionRange = getBatch(clientIdentity, batchId);
+        if (positionRange != null) {
+            String path = ZookeeperPathUtils
+                .getBatchMarkWithIdPath(clientIdentity.getDestination(), clientIdentity.getClientId(), batchId);
+            zkClientx.delete(path);
+        }
+
+        return positionRange;
+    }
+
+    public PositionRange getBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils
+            .getBatchMarkWithIdPath(clientIdentity.getDestination(), clientIdentity.getClientId(), batchId);
+        byte[] data = zkClientx.readData(path, true);
+        if (data == null) {
+            return null;
+        }
+
+        PositionRange positionRange = JsonUtils.unmarshalFromByte(data, PositionRange.class);
+        return positionRange;
+    }
+
+    public void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        List<String> batchChilds = zkClientx.getChildren(path);
+
+        for (String batchChild : batchChilds) {
+            String batchPath = path + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + batchChild;
+            zkClientx.delete(batchPath);
+        }
+    }
+
+    public PositionRange getLastestBatch(ClientIdentity clientIdentity) {
+        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        List<String> nodes = null;
+        try {
+            nodes = zkClientx.getChildren(path);
+        } catch (ZkNoNodeException e) {
+            // ignore
+        }
+
+        if (CollectionUtils.isEmpty(nodes)) {
+            return null;
+        }
+        // 找到最大的Id
+        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
+        for (String batchIdString : nodes) {
+            batchIds.add(Long.valueOf(batchIdString));
+        }
+        Long maxBatchId = Collections.max(batchIds);
+        PositionRange result = getBatch(clientIdentity, maxBatchId);
+        if (result == null) { // 出现为null,说明zk节点有变化,重新获取
+            return getLastestBatch(clientIdentity);
+        } else {
+            return result;
+        }
+    }
+
+    public PositionRange getFirstBatch(ClientIdentity clientIdentity) {
+        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        List<String> nodes = null;
+        try {
+            nodes = zkClientx.getChildren(path);
+        } catch (ZkNoNodeException e) {
+            // ignore
+        }
+
+        if (CollectionUtils.isEmpty(nodes)) {
+            return null;
+        }
+        // 找到最小的Id
+        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
+        for (String batchIdString : nodes) {
+            batchIds.add(Long.valueOf(batchIdString));
+        }
+        Long minBatchId = Collections.min(batchIds);
+        PositionRange result = getBatch(clientIdentity, minBatchId);
+        if (result == null) { // 出现为null,说明zk节点有变化,重新获取
+            return getFirstBatch(clientIdentity);
+        } else {
+            return result;
+        }
+    }
+
+    public Map<Long, PositionRange> listAllBatchs(ClientIdentity clientIdentity) {
+        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        List<String> nodes = null;
+        try {
+            nodes = zkClientx.getChildren(path);
+        } catch (ZkNoNodeException e) {
+            // ignore
+        }
+
+        if (CollectionUtils.isEmpty(nodes)) {
+            return Maps.newHashMap();
+        }
+        // 找到最大的Id
+        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
+        for (String batchIdString : nodes) {
+            batchIds.add(Long.valueOf(batchIdString));
+        }
+
+        Collections.sort(batchIds); // 从小到大排序
+        Map<Long, PositionRange> positionRanges = Maps.newLinkedHashMap();
+        for (Long batchId : batchIds) {
+            PositionRange result = getBatch(clientIdentity, batchId);
+            if (result == null) {// 出现为null,说明zk节点有变化,重新获取
+                return listAllBatchs(clientIdentity);
+            } else {
+                positionRanges.put(batchId, result);
+            }
+        }
+
+        return positionRanges;
+    }
+
+    // =========== setter ==========
+
+    public void setZkClientx(ZkClientx zkClientx) {
+        this.zkClientx = zkClientx;
+    }
+
+}

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -708,7 +708,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 }
             }
 
-            buffer.nextValue(fieldMeta.getColumnName(), i, info.type, info.meta, isBinary);
+            buffer.nextValue(columnBuilder.getName(), i, info.type, info.meta, isBinary);
             int javaType = buffer.getJavaType();
             if (buffer.isNull()) {
                 columnBuilder.setIsNull(true);

+ 9 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -23,6 +23,7 @@ public class MQProperties {
     private String  acks                   = "all";
     private String  aliyunAccessKey        = "";
     private String  aliyunSecretKey        = "";
+    private boolean transaction            = false;           // 是否开启事务
 
     public static class CanalDestination {
 
@@ -201,4 +202,12 @@ public class MQProperties {
     public void setMaxRequestSize(int maxRequestSize) {
         this.maxRequestSize = maxRequestSize;
     }
+
+    public boolean getTransaction() {
+        return transaction;
+    }
+
+    public void setTransaction(boolean transaction) {
+        this.transaction = transaction;
+    }
 }

+ 72 - 68
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.kafka;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -43,12 +44,16 @@ public class CanalKafkaProducer implements CanalMQProducer {
         properties.put("bootstrap.servers", kafkaProperties.getServers());
         properties.put("acks", kafkaProperties.getAcks());
         properties.put("compression.type", kafkaProperties.getCompressionType());
-        properties.put("retries", kafkaProperties.getRetries());
         properties.put("batch.size", kafkaProperties.getBatchSize());
         properties.put("linger.ms", kafkaProperties.getLingerMs());
         properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
         properties.put("buffer.memory", kafkaProperties.getBufferMemory());
         properties.put("key.serializer", StringSerializer.class.getName());
+        if(kafkaProperties.getTransaction()){
+            properties.put("transactional.id", "canal-transactional-id");
+        } else {
+            properties.put("retries", kafkaProperties.getRetries());
+        }
         if (!kafkaProperties.getFlatMessage()) {
             properties.put("value.serializer", MessageSerializer.class.getName());
             producer = new KafkaProducer<String, Message>(properties);
@@ -56,8 +61,13 @@ public class CanalKafkaProducer implements CanalMQProducer {
             properties.put("value.serializer", StringSerializer.class.getName());
             producer2 = new KafkaProducer<String, String>(properties);
         }
-
-        // producer.initTransactions();
+        if (kafkaProperties.getTransaction()) {
+            if (!kafkaProperties.getFlatMessage()) {
+                producer.initTransactions();
+            } else {
+                producer2.initTransactions();
+            }
+        }
     }
 
     @Override
@@ -79,6 +89,17 @@ public class CanalKafkaProducer implements CanalMQProducer {
 
     @Override
     public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
+        // 开启事务,需要kafka版本支持
+        Producer producerTmp;
+        if (!kafkaProperties.getFlatMessage()) {
+            producerTmp = producer;
+        } else {
+            producerTmp = producer2;
+        }
+
+        if (kafkaProperties.getTransaction()) {
+            producerTmp.beginTransaction();
+        }
         try {
             if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
                 // 动态topic
@@ -96,78 +117,64 @@ public class CanalKafkaProducer implements CanalMQProducer {
             } else {
                 send(canalDestination, canalDestination.getTopic(), message);
             }
+            if (kafkaProperties.getTransaction()) {
+                producerTmp.commitTransaction();
+            }
             callback.commit();
         } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            if (kafkaProperties.getTransaction()) {
+                producerTmp.abortTransaction();
+            }
             callback.rollback();
         }
     }
 
     private void send(MQProperties.CanalDestination canalDestination, String topicName,
                       Message message) throws Exception {
-        // producer.beginTransaction();
         if (!kafkaProperties.getFlatMessage()) {
-            try {
-                ProducerRecord<String, Message> record = null;
-                if (canalDestination.getPartition() != null) {
-                    record = new ProducerRecord<>(topicName, canalDestination.getPartition(), null, message);
-                } else {
-                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
-                        Message[] messages = MQMessageUtils.messagePartition(message,
-                            canalDestination.getPartitionsNum(),
-                            canalDestination.getPartitionHash());
-                        int length = messages.length;
-                        for (int i = 0; i < length; i++) {
-                            Message messagePartition = messages[i];
-                            if (messagePartition != null) {
-                                record = new ProducerRecord<>(topicName, i, null, messagePartition);
-                            }
+            ProducerRecord<String, Message> record = null;
+            if (canalDestination.getPartition() != null) {
+                record = new ProducerRecord<>(topicName, canalDestination.getPartition(), null, message);
+            } else {
+                if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
+                    Message[] messages = MQMessageUtils.messagePartition(message,
+                        canalDestination.getPartitionsNum(),
+                        canalDestination.getPartitionHash());
+                    int length = messages.length;
+                    for (int i = 0; i < length; i++) {
+                        Message messagePartition = messages[i];
+                        if (messagePartition != null) {
+                            record = new ProducerRecord<>(topicName, i, null, messagePartition);
                         }
-                    } else {
-                        record = new ProducerRecord<>(topicName, 0, null, message);
                     }
+                } else {
+                    record = new ProducerRecord<>(topicName, 0, null, message);
                 }
+            }
 
-                if (record != null) {
-                    // 同步发送原生message
+            if (record != null) {
+                if (kafkaProperties.getTransaction()) {
+                    producer.send(record);
+                } else {
                     producer.send(record).get();
+                }
 
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
-                    }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
                 }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-                // producer.abortTransaction();
-                throw e;
             }
         } else {
             // 发送扁平数据json
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
             if (flatMessages != null) {
-                int idx = 0;
-                int size = flatMessages.size();
                 for (FlatMessage flatMessage : flatMessages) {
-                    idx++;
                     if (StringUtils.isEmpty(canalDestination.getPartitionHash())) {
-                        try {
-                            Integer partition = canalDestination.getPartition();
-                            if (partition == null) {
-                                partition = 0;
-                            }
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
-                                partition,
-                                null,
-                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
-                            if (idx != size) {
-                                producer2.send(record);
-                            } else {
-                                producer2.send(record).get();
-                            }
-                        } catch (Exception e) {
-                            logger.error(e.getMessage(), e);
-                            // producer.abortTransaction();
-                            throw e;
+                        Integer partition = canalDestination.getPartition();
+                        if (partition == null) {
+                            partition = 0;
                         }
+                        produce(topicName, partition, flatMessage);
                     } else {
                         FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                             canalDestination.getPartitionsNum(),
@@ -176,25 +183,11 @@ public class CanalKafkaProducer implements CanalMQProducer {
                         for (int i = 0; i < length; i++) {
                             FlatMessage flatMessagePart = partitionFlatMessage[i];
                             if (flatMessagePart != null) {
-                                try {
-                                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                        topicName,
-                                        i,
-                                        null,
-                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
-                                    if (idx != size) {
-                                        producer2.send(record);
-                                    } else {
-                                        producer2.send(record).get();
-                                    }
-                                } catch (Exception e) {
-                                    logger.error(e.getMessage(), e);
-                                    // producer.abortTransaction();
-                                    throw e;
-                                }
+                                produce(topicName, i, flatMessagePart);
                             }
                         }
                     }
+
                     if (logger.isDebugEnabled()) {
                         logger.debug("Send flat message to kafka topic: [{}], packet: {}",
                             topicName,
@@ -203,8 +196,19 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 }
             }
         }
+    }
 
-        // producer.commitTransaction();
+    private void produce(String topicName, int partition, FlatMessage flatMessage) throws ExecutionException,
+                                                                                   InterruptedException {
+        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
+            partition,
+            null,
+            JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
+        if (kafkaProperties.getTransaction()) {
+            producer2.send(record);
+        } else {
+            producer2.send(record).get();
+        }
     }
 
 }