Browse Source

Merge pull request #24 from alibaba/master

merge
rewerma 6 years ago
parent
commit
9dfa5da7bc
47 changed files with 215 additions and 97 deletions
  1. 2 2
      client-adapter/common/pom.xml
  2. 20 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  3. 23 23
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java
  4. 4 6
      client-adapter/elasticsearch/pom.xml
  5. 3 3
      client-adapter/hbase/pom.xml
  6. 3 4
      client-adapter/launcher/pom.xml
  7. 0 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java
  8. 2 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java
  9. 1 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java
  10. 1 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  11. 2 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  12. 5 9
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  13. 2 0
      client-adapter/launcher/src/main/resources/application.yml
  14. 1 1
      client-adapter/logger/pom.xml
  15. 75 2
      client-adapter/pom.xml
  16. 4 6
      client-adapter/rdb/pom.xml
  17. 1 1
      client/pom.xml
  18. 1 1
      common/pom.xml
  19. 1 1
      common/src/main/java/com/alibaba/otter/canal/common/utils/BooleanMutex.java
  20. 1 1
      dbsync/pom.xml
  21. 1 1
      deployer/pom.xml
  22. 1 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java
  23. 4 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java
  24. 1 0
      deployer/src/main/resources/canal.properties
  25. 1 1
      driver/pom.xml
  26. 1 1
      example/pom.xml
  27. 1 1
      filter/pom.xml
  28. 1 1
      instance/core/pom.xml
  29. 1 1
      instance/manager/pom.xml
  30. 1 1
      instance/pom.xml
  31. 1 1
      instance/spring/pom.xml
  32. 1 1
      meta/pom.xml
  33. 1 1
      parse/pom.xml
  34. 0 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  35. 4 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  36. 14 6
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
  37. 10 0
      parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java
  38. 1 3
      pom.xml
  39. 2 2
      prometheus/pom.xml
  40. 1 1
      protocol/pom.xml
  41. 1 1
      server/pom.xml
  42. 8 0
      server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java
  43. 1 0
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java
  44. 1 2
      server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java
  45. 1 1
      sink/pom.xml
  46. 1 1
      store/pom.xml
  47. 2 2
      store/src/main/java/com/alibaba/otter/canal/store/model/Event.java

+ 2 - 2
client-adapter/common/pom.xml

@@ -3,7 +3,7 @@
     <parent>
     <parent>
         <artifactId>canal.client-adapter</artifactId>
         <artifactId>canal.client-adapter</artifactId>
         <groupId>com.alibaba.otter</groupId>
         <groupId>com.alibaba.otter</groupId>
-        <version>1.1.2-SNAPSHOT</version>
+        <version>1.1.3-SNAPSHOT</version>
     </parent>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <modelVersion>4.0.0</modelVersion>
     <groupId>com.alibaba.otter</groupId>
     <groupId>com.alibaba.otter</groupId>
@@ -14,7 +14,7 @@
         <dependency>
         <dependency>
             <groupId>com.alibaba.otter</groupId>
             <groupId>com.alibaba.otter</groupId>
             <artifactId>canal.protocol</artifactId>
             <artifactId>canal.protocol</artifactId>
-            <version>1.1.2-SNAPSHOT</version>
+            <version>1.1.3-SNAPSHOT</version>
         </dependency>
         </dependency>
         <dependency>
         <dependency>
             <groupId>joda-time</groupId>
             <groupId>joda-time</groupId>

+ 20 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -30,6 +30,10 @@ public class CanalClientConfig {
     private Long               timeout;
     private Long               timeout;
     // 模式 tcp kafka rocketMQ
     // 模式 tcp kafka rocketMQ
     private String             mode          = "tcp";
     private String             mode          = "tcp";
+    // aliyun ak/sk
+    private String             accessKey;
+    private String             secretKey;
+
     // canal adapters 配置
     // canal adapters 配置
     private List<CanalAdapter> canalAdapters;
     private List<CanalAdapter> canalAdapters;
 
 
@@ -105,6 +109,22 @@ public class CanalClientConfig {
         this.mode = mode;
         this.mode = mode;
     }
     }
 
 
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
     public List<CanalAdapter> getCanalAdapters() {
     public List<CanalAdapter> getCanalAdapters() {
         return canalAdapters;
         return canalAdapters;
     }
     }

+ 23 - 23
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java

@@ -27,8 +27,7 @@ import org.slf4j.LoggerFactory;
  */
  */
 public class ExtensionLoader<T> {
 public class ExtensionLoader<T> {
 
 
-    private static final Logger                                      logger                     = LoggerFactory
-        .getLogger(ExtensionLoader.class);
+    private static final Logger                                      logger                     = LoggerFactory.getLogger(ExtensionLoader.class);
 
 
     private static final String                                      SERVICES_DIRECTORY         = "META-INF/services/";
     private static final String                                      SERVICES_DIRECTORY         = "META-INF/services/";
 
 
@@ -36,8 +35,7 @@ public class ExtensionLoader<T> {
 
 
     private static final String                                      DEFAULT_CLASSLOADER_POLICY = "internal";
     private static final String                                      DEFAULT_CLASSLOADER_POLICY = "internal";
 
 
-    private static final Pattern                                     NAME_SEPARATOR             = Pattern
-        .compile("\\s*[,]+\\s*");
+    private static final Pattern                                     NAME_SEPARATOR             = Pattern.compile("\\s*[,]+\\s*");
 
 
     private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS          = new ConcurrentHashMap<>();
     private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS          = new ConcurrentHashMap<>();
 
 
@@ -173,8 +171,7 @@ public class ExtensionLoader<T> {
             return instance;
             return instance;
         } catch (Throwable t) {
         } catch (Throwable t) {
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
-                                            + ")  could not be instantiated: " + t.getMessage(),
-                t);
+                                            + ")  could not be instantiated: " + t.getMessage(), t);
         }
         }
     }
     }
 
 
@@ -194,8 +191,7 @@ public class ExtensionLoader<T> {
             return instance;
             return instance;
         } catch (Throwable t) {
         } catch (Throwable t) {
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
-                                            + ")  could not be instantiated: " + t.getMessage(),
-                t);
+                                            + ")  could not be instantiated: " + t.getMessage(), t);
         }
         }
     }
     }
 
 
@@ -334,10 +330,12 @@ public class ExtensionLoader<T> {
                                             // Class.forName(line, true,
                                             // Class.forName(line, true,
                                             // classLoader);
                                             // classLoader);
                                             if (!type.isAssignableFrom(clazz)) {
                                             if (!type.isAssignableFrom(clazz)) {
-                                                throw new IllegalStateException(
-                                                    "Error when load extension class(interface: " + type
-                                                                                + ", class line: " + clazz.getName()
-                                                                                + "), class " + clazz.getName()
+                                                throw new IllegalStateException("Error when load extension class(interface: "
+                                                                                + type
+                                                                                + ", class line: "
+                                                                                + clazz.getName()
+                                                                                + "), class "
+                                                                                + clazz.getName()
                                                                                 + "is not subtype of interface.");
                                                                                 + "is not subtype of interface.");
                                             } else {
                                             } else {
                                                 try {
                                                 try {
@@ -355,9 +353,9 @@ public class ExtensionLoader<T> {
                                                                 extensionClasses.put(n, clazz);
                                                                 extensionClasses.put(n, clazz);
                                                             } else if (c != clazz) {
                                                             } else if (c != clazz) {
                                                                 cachedNames.remove(clazz);
                                                                 cachedNames.remove(clazz);
-                                                                throw new IllegalStateException(
-                                                                    "Duplicate extension " + type.getName() + " name "
-                                                                                                + n + " on "
+                                                                throw new IllegalStateException("Duplicate extension "
+                                                                                                + type.getName()
+                                                                                                + " name " + n + " on "
                                                                                                 + c.getName() + " and "
                                                                                                 + c.getName() + " and "
                                                                                                 + clazz.getName());
                                                                                                 + clazz.getName());
                                                             }
                                                             }
@@ -367,9 +365,12 @@ public class ExtensionLoader<T> {
                                             }
                                             }
                                         }
                                         }
                                     } catch (Throwable t) {
                                     } catch (Throwable t) {
-                                        IllegalStateException e = new IllegalStateException(
-                                            "Failed to load extension class(interface: " + type + ", class line: "
-                                                                                            + line + ") in " + url
+                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: "
+                                                                                            + type
+                                                                                            + ", class line: "
+                                                                                            + line
+                                                                                            + ") in "
+                                                                                            + url
                                                                                             + ", cause: "
                                                                                             + ", cause: "
                                                                                             + t.getMessage(),
                                                                                             + t.getMessage(),
                                             t);
                                             t);
@@ -384,18 +385,17 @@ public class ExtensionLoader<T> {
                         }
                         }
                     } catch (Throwable t) {
                     } catch (Throwable t) {
                         logger.error("Exception when load extension class(interface: " + type + ", class file: " + url
                         logger.error("Exception when load extension class(interface: " + type + ", class file: " + url
-                                     + ") in " + url,
-                            t);
+                                     + ") in " + url, t);
                     }
                     }
                 } // end of while urls
                 } // end of while urls
             }
             }
         } catch (Throwable t) {
         } catch (Throwable t) {
-            logger.error(
-                "Exception when load extension class(interface: " + type + ", description file: " + fileName + ").",
-                t);
+            logger.error("Exception when load extension class(interface: " + type + ", description file: " + fileName
+                         + ").", t);
         }
         }
     }
     }
 
 
+    @SuppressWarnings("unused")
     private static ClassLoader findClassLoader() {
     private static ClassLoader findClassLoader() {
         return ExtensionLoader.class.getClassLoader();
         return ExtensionLoader.class.getClassLoader();
     }
     }

+ 4 - 6
client-adapter/elasticsearch/pom.xml

@@ -1,11 +1,9 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
     <parent>
         <artifactId>canal.client-adapter</artifactId>
         <artifactId>canal.client-adapter</artifactId>
         <groupId>com.alibaba.otter</groupId>
         <groupId>com.alibaba.otter</groupId>
-        <version>1.1.2-SNAPSHOT</version>
+        <version>1.1.3-SNAPSHOT</version>
     </parent>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <modelVersion>4.0.0</modelVersion>
     <groupId>com.alibaba.otter</groupId>
     <groupId>com.alibaba.otter</groupId>
@@ -81,9 +79,9 @@
                         </goals>
                         </goals>
                         <configuration>
                         <configuration>
                             <tasks>
                             <tasks>
-                                <copy todir="${project.basedir}/../launcher/target/classes/es" overwrite="true" >
+                                <copy todir="${project.basedir}/../launcher/target/classes/es" overwrite="true">
                                     <fileset dir="${project.basedir}/target/classes/es" erroronmissingdir="true">
                                     <fileset dir="${project.basedir}/target/classes/es" erroronmissingdir="true">
-                                        <include name="*.yml"/>
+                                        <include name="*.yml" />
                                     </fileset>
                                     </fileset>
                                 </copy>
                                 </copy>
                             </tasks>
                             </tasks>

+ 3 - 3
client-adapter/hbase/pom.xml

@@ -3,7 +3,7 @@
     <parent>
     <parent>
         <artifactId>canal.client-adapter</artifactId>
         <artifactId>canal.client-adapter</artifactId>
         <groupId>com.alibaba.otter</groupId>
         <groupId>com.alibaba.otter</groupId>
-        <version>1.1.2-SNAPSHOT</version>
+        <version>1.1.3-SNAPSHOT</version>
     </parent>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <modelVersion>4.0.0</modelVersion>
     <groupId>com.alibaba.otter</groupId>
     <groupId>com.alibaba.otter</groupId>
@@ -72,9 +72,9 @@
                         </goals>
                         </goals>
                         <configuration>
                         <configuration>
                             <tasks>
                             <tasks>
-                                <copy todir="${project.basedir}/../launcher/target/classes/hbase" overwrite="true" >
+                                <copy todir="${project.basedir}/../launcher/target/classes/hbase" overwrite="true">
                                     <fileset dir="${project.basedir}/target/classes/hbase" erroronmissingdir="true">
                                     <fileset dir="${project.basedir}/target/classes/hbase" erroronmissingdir="true">
-                                        <include name="*.yml"/>
+                                        <include name="*.yml" />
                                     </fileset>
                                     </fileset>
                                 </copy>
                                 </copy>
                             </tasks>
                             </tasks>

+ 3 - 4
client-adapter/launcher/pom.xml

@@ -1,10 +1,9 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
     <parent>
         <artifactId>canal.client-adapter</artifactId>
         <artifactId>canal.client-adapter</artifactId>
         <groupId>com.alibaba.otter</groupId>
         <groupId>com.alibaba.otter</groupId>
-        <version>1.1.2-SNAPSHOT</version>
+        <version>1.1.3-SNAPSHOT</version>
     </parent>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <modelVersion>4.0.0</modelVersion>
     <groupId>com.alibaba.otter</groupId>
     <groupId>com.alibaba.otter</groupId>
@@ -33,7 +32,7 @@
         <dependency>
         <dependency>
             <groupId>com.alibaba.otter</groupId>
             <groupId>com.alibaba.otter</groupId>
             <artifactId>canal.client</artifactId>
             <artifactId>canal.client</artifactId>
-            <version>1.1.2-SNAPSHOT</version>
+            <version>1.1.3-SNAPSHOT</version>
         </dependency>
         </dependency>
         <dependency>
         <dependency>
             <groupId>org.yaml</groupId>
             <groupId>org.yaml</groupId>

+ 0 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java

@@ -3,7 +3,6 @@ package com.alibaba.otter.canal.adapter.launcher;
 import org.springframework.boot.Banner;
 import org.springframework.boot.Banner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.builder.SpringApplicationBuilder;
 
 
 /**
 /**
  * 启动入口
  * 启动入口

+ 2 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java

@@ -64,11 +64,12 @@ public class SyncSwitch {
         }
         }
     }
     }
 
 
+    @SuppressWarnings("resource")
     private synchronized void startListen(String destination, BooleanMutex mutex) {
     private synchronized void startListen(String destination, BooleanMutex mutex) {
         try {
         try {
             String path = SYN_SWITCH_ZK_NODE + destination;
             String path = SYN_SWITCH_ZK_NODE + destination;
             CuratorFramework curator = curatorClient.getCurator();
             CuratorFramework curator = curatorClient.getCurator();
-            final NodeCache nodeCache = new NodeCache(curator, path);
+            NodeCache nodeCache = new NodeCache(curator, path);
             nodeCache.start();
             nodeCache.start();
             nodeCache.getListenable().addListener(() -> initMutex(curator, destination, mutex));
             nodeCache.getListenable().addListener(() -> initMutex(curator, destination, mutex));
         } catch (Exception e) {
         } catch (Exception e) {

+ 1 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java

@@ -47,6 +47,7 @@ public class AdapterCanalConfig extends CanalClientConfig {
         return srcDataSources;
         return srcDataSources;
     }
     }
 
 
+    @SuppressWarnings("resource")
     public void setSrcDataSources(Map<String, DatasourceConfig> srcDataSources) {
     public void setSrcDataSources(Map<String, DatasourceConfig> srcDataSources) {
         this.srcDataSources = srcDataSources;
         this.srcDataSources = srcDataSources;
 
 

+ 1 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -44,8 +44,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
         while (!running)
         while (!running)
             ;
             ;
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
-        int retry = canalClientConfig.getRetries() == null
-                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+        int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
 
 
         while (running) {
         while (running) {

+ 2 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -122,6 +122,8 @@ public class CanalAdapterLoader {
                         canalAdapter.getInstance(),
                         canalAdapter.getInstance(),
                         group.getGroupId(),
                         group.getGroupId(),
                         canalOuterAdapterGroups,
                         canalOuterAdapterGroups,
+                        canalClientConfig.getAccessKey(),
+                        canalClientConfig.getSecretKey(),
                         canalClientConfig.getFlatMessage());
                         canalClientConfig.getFlatMessage());
                     canalMQWorker.put(canalAdapter.getInstance() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                     canalMQWorker.put(canalAdapter.getInstance() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                     rocketMQWorker.start();
                     rocketMQWorker.start();

+ 5 - 9
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -3,16 +3,12 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 import java.util.List;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.errors.WakeupException;
 
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
 
 
 /**
 /**
  * rocketmq对应的client适配器工作线程
  * rocketmq对应的client适配器工作线程
@@ -26,13 +22,14 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
     private boolean                flatMessage;
     private boolean                flatMessage;
 
 
     public CanalAdapterRocketMQWorker(CanalClientConfig canalClientConfig, String nameServers, String topic,
     public CanalAdapterRocketMQWorker(CanalClientConfig canalClientConfig, String nameServers, String topic,
-                                      String groupId, List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
+                                      String groupId, List<List<OuterAdapter>> canalOuterAdapters, String accessKey,
+                                      String secretKey, boolean flatMessage){
         super(canalOuterAdapters);
         super(canalOuterAdapters);
         this.canalClientConfig = canalClientConfig;
         this.canalClientConfig = canalClientConfig;
         this.topic = topic;
         this.topic = topic;
         this.flatMessage = flatMessage;
         this.flatMessage = flatMessage;
         this.canalDestination = topic;
         this.canalDestination = topic;
-        this.connector = new RocketMQCanalConnector(nameServers, topic, groupId, flatMessage);
+        this.connector = new RocketMQCanalConnector(nameServers, topic, groupId, accessKey, secretKey, flatMessage);
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
     }
     }
 
 
@@ -42,8 +39,7 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
             ;
             ;
 
 
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
-        int retry = canalClientConfig.getRetries() == null
-                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+        int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
 
 
         while (running) {
         while (running) {

+ 2 - 0
client-adapter/launcher/src/main/resources/application.yml

@@ -21,6 +21,8 @@ canal.conf:
   syncBatchSize: 1000
   syncBatchSize: 1000
   retries: 0
   retries: 0
   timeout:
   timeout:
+  accessKey:
+  secretKey:
   mode: tcp # kafka rocketMQ
   mode: tcp # kafka rocketMQ
 #  srcDataSources:
 #  srcDataSources:
 #    defaultDS:
 #    defaultDS:

+ 1 - 1
client-adapter/logger/pom.xml

@@ -3,7 +3,7 @@
     <parent>
     <parent>
         <artifactId>canal.client-adapter</artifactId>
         <artifactId>canal.client-adapter</artifactId>
         <groupId>com.alibaba.otter</groupId>
         <groupId>com.alibaba.otter</groupId>
-        <version>1.1.2-SNAPSHOT</version>
+        <version>1.1.3-SNAPSHOT</version>
     </parent>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <modelVersion>4.0.0</modelVersion>
     <groupId>com.alibaba.otter</groupId>
     <groupId>com.alibaba.otter</groupId>

+ 75 - 2
client-adapter/pom.xml

@@ -3,9 +3,14 @@
     <modelVersion>4.0.0</modelVersion>
     <modelVersion>4.0.0</modelVersion>
     <groupId>com.alibaba.otter</groupId>
     <groupId>com.alibaba.otter</groupId>
     <artifactId>canal.client-adapter</artifactId>
     <artifactId>canal.client-adapter</artifactId>
-    <version>1.1.2-SNAPSHOT</version>
+    <version>1.1.3-SNAPSHOT</version>
     <packaging>pom</packaging>
     <packaging>pom</packaging>
     <name>canal client adapter module for otter ${project.version}</name>
     <name>canal client adapter module for otter ${project.version}</name>
+    <parent>
+        <groupId>com.alibaba.otter</groupId>
+        <artifactId>canal</artifactId>
+        <version>1.1.3-SNAPSHOT</version>
+    </parent>
 
 
     <properties>
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -25,9 +30,42 @@
         <module>rdb</module>
         <module>rdb</module>
     </modules>
     </modules>
 
 
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0</url>
+        </license>
+    </licenses>
+
+    <scm>
+        <url>git@github.com:alibaba/canal.git</url>
+        <connection>scm:git:git@github.com:alibaba/canal.git</connection>
+        <developerConnection>scm:git:git@github.com:alibaba/canal.git</developerConnection>
+    </scm>
+
     <repositories>
     <repositories>
         <repository>
         <repository>
             <id>central</id>
             <id>central</id>
+            <url>http://repo1.maven.org/maven2</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+        <repository>
+            <id>java.net</id>
+            <url>http://download.java.net/maven/2/</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+        <repository>
+            <id>aliyun</id>
             <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
             <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
             <releases>
             <releases>
                 <enabled>true</enabled>
                 <enabled>true</enabled>
@@ -36,6 +74,28 @@
                 <enabled>false</enabled>
                 <enabled>false</enabled>
             </snapshots>
             </snapshots>
         </repository>
         </repository>
+        <repository>
+            <id>sonatype</id>
+            <name>sonatype</name>
+            <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+        <repository>
+            <id>sonatype-release</id>
+            <name>sonatype-release</name>
+            <url>https://oss.sonatype.org/service/local/repositories/releases/content</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
     </repositories>
     </repositories>
 
 
     <build>
     <build>
@@ -52,4 +112,17 @@
             </plugin>
             </plugin>
         </plugins>
         </plugins>
     </build>
     </build>
-</project>
+
+    <distributionManagement>
+        <snapshotRepository>
+            <id>sonatype-nexus-snapshots</id>
+            <name>Sonatype Nexus Snapshots</name>
+            <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
+        </snapshotRepository>
+        <repository>
+            <id>sonatype-nexus-staging</id>
+            <name>Nexus Release Repository</name>
+            <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
+        </repository>
+    </distributionManagement>
+</project>

+ 4 - 6
client-adapter/rdb/pom.xml

@@ -1,11 +1,9 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
     <parent>
         <artifactId>canal.client-adapter</artifactId>
         <artifactId>canal.client-adapter</artifactId>
         <groupId>com.alibaba.otter</groupId>
         <groupId>com.alibaba.otter</groupId>
-        <version>1.1.2-SNAPSHOT</version>
+        <version>1.1.3-SNAPSHOT</version>
     </parent>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <modelVersion>4.0.0</modelVersion>
     <groupId>com.alibaba.otter</groupId>
     <groupId>com.alibaba.otter</groupId>
@@ -84,9 +82,9 @@
                         </goals>
                         </goals>
                         <configuration>
                         <configuration>
                             <tasks>
                             <tasks>
-                                <copy todir="${project.basedir}/../launcher/target/classes/rdb" overwrite="true" >
+                                <copy todir="${project.basedir}/../launcher/target/classes/rdb" overwrite="true">
                                     <fileset dir="${project.basedir}/target/classes/rdb" erroronmissingdir="true">
                                     <fileset dir="${project.basedir}/target/classes/rdb" erroronmissingdir="true">
-                                        <include name="*.yml"/>
+                                        <include name="*.yml" />
                                     </fileset>
                                     </fileset>
                                 </copy>
                                 </copy>
                             </tasks>
                             </tasks>

+ 1 - 1
client/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
common/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<artifactId>canal.common</artifactId>
 	<artifactId>canal.common</artifactId>

+ 1 - 1
common/src/main/java/com/alibaba/otter/canal/common/utils/BooleanMutex.java

@@ -5,7 +5,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 
 
 /**
 /**
- * 实现一个互斥实现,基于Cocurrent中的AbstractQueuedSynchronizer实现了自己的sync <br/>
+ * 实现一个互斥实现,基于Cocurrent中的AQS实现了自己的sync <br/>
  * 应用场景:系统初始化/授权控制,没权限时阻塞等待。有权限时所有线程都可以快速通过
  * 应用场景:系统初始化/授权控制,没权限时阻塞等待。有权限时所有线程都可以快速通过
  * 
  * 
  * <pre>
  * <pre>

+ 1 - 1
dbsync/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
deployer/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -41,6 +41,7 @@ public class CanalConstants {
     public static final String CANAL_MQ_RETRIES                  = ROOT + "." + "mq.retries";
     public static final String CANAL_MQ_RETRIES                  = ROOT + "." + "mq.retries";
     public static final String CANAL_MQ_BATCHSIZE                = ROOT + "." + "mq.batchSize";
     public static final String CANAL_MQ_BATCHSIZE                = ROOT + "." + "mq.batchSize";
     public static final String CANAL_MQ_LINGERMS                 = ROOT + "." + "mq.lingerMs";
     public static final String CANAL_MQ_LINGERMS                 = ROOT + "." + "mq.lingerMs";
+    public static final String CANAL_MQ_MAXREQUESTSIZE           = ROOT + "." + "mq.maxRequestSize";
     public static final String CANAL_MQ_BUFFERMEMORY             = ROOT + "." + "mq.bufferMemory";
     public static final String CANAL_MQ_BUFFERMEMORY             = ROOT + "." + "mq.bufferMemory";
     public static final String CANAL_MQ_CANALBATCHSIZE           = ROOT + "." + "mq.canalBatchSize";
     public static final String CANAL_MQ_CANALBATCHSIZE           = ROOT + "." + "mq.canalBatchSize";
     public static final String CANAL_MQ_CANALGETTIMEOUT          = ROOT + "." + "mq.canalGetTimeout";
     public static final String CANAL_MQ_CANALGETTIMEOUT          = ROOT + "." + "mq.canalGetTimeout";

+ 4 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -101,6 +101,10 @@ public class CanalLauncher {
         if (!StringUtils.isEmpty(lingerMs)) {
         if (!StringUtils.isEmpty(lingerMs)) {
             mqProperties.setLingerMs(Integer.valueOf(lingerMs));
             mqProperties.setLingerMs(Integer.valueOf(lingerMs));
         }
         }
+        String maxRequestSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_MAXREQUESTSIZE);
+        if (!StringUtils.isEmpty(maxRequestSize)) {
+            mqProperties.setMaxRequestSize(Integer.valueOf(maxRequestSize));
+        }
         String bufferMemory = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BUFFERMEMORY);
         String bufferMemory = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BUFFERMEMORY);
         if (!StringUtils.isEmpty(bufferMemory)) {
         if (!StringUtils.isEmpty(bufferMemory)) {
             mqProperties.setBufferMemory(Long.valueOf(bufferMemory));
             mqProperties.setBufferMemory(Long.valueOf(bufferMemory));

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -104,6 +104,7 @@ canal.instance.global.spring.xml = classpath:spring/file-instance.xml
 canal.mq.servers = 127.0.0.1:6667
 canal.mq.servers = 127.0.0.1:6667
 canal.mq.retries = 0
 canal.mq.retries = 0
 canal.mq.batchSize = 16384
 canal.mq.batchSize = 16384
+canal.mq.maxRequestSize = 1048576
 canal.mq.lingerMs = 1
 canal.mq.lingerMs = 1
 canal.mq.bufferMemory = 33554432
 canal.mq.bufferMemory = 33554432
 canal.mq.canalBatchSize = 50
 canal.mq.canalBatchSize = 50

+ 1 - 1
driver/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
example/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
filter/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
instance/core/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../../pom.xml</relativePath>
 		<relativePath>../../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<artifactId>canal.instance.core</artifactId>
 	<artifactId>canal.instance.core</artifactId>

+ 1 - 1
instance/manager/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../../pom.xml</relativePath>
 		<relativePath>../../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
instance/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
instance/spring/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../../pom.xml</relativePath>
 		<relativePath>../../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
meta/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
parse/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<artifactId>canal.parse</artifactId>
 	<artifactId>canal.parse</artifactId>

+ 0 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -3,9 +3,6 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 import java.nio.charset.Charset;
 import java.nio.charset.Charset;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.alibaba.otter.canal.filter.CanalEventFilter;
 import com.alibaba.otter.canal.filter.CanalEventFilter;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.CanalEventParser;
@@ -21,7 +18,6 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
 
 
 public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
 
-    protected final Logger         logger                    = LoggerFactory.getLogger(this.getClass());
     protected static final long    BINLOG_START_OFFEST       = 4L;
     protected static final long    BINLOG_START_OFFEST       = 4L;
 
 
     protected TableMetaTSDBFactory tableMetaTSDBFactory      = new DefaultTableMetaTSDBFactory();
     protected TableMetaTSDBFactory tableMetaTSDBFactory      = new DefaultTableMetaTSDBFactory();

+ 4 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -135,6 +135,10 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     protected void afterDump(ErosaConnection connection) {
     protected void afterDump(ErosaConnection connection) {
         super.afterDump(connection);
         super.afterDump(connection);
 
 
+        if (connection == null) {
+            throw new CanalParseException("illegal connection is null");
+        }
+
         if (!(connection instanceof MysqlConnection)) {
         if (!(connection instanceof MysqlConnection)) {
             throw new CanalParseException("Unsupported connection type : " + connection.getClass().getSimpleName());
             throw new CanalParseException("Unsupported connection type : " + connection.getClass().getSimpleName());
         }
         }

+ 14 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -76,7 +76,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
     private int                             snapshotInterval = 24;
     private int                             snapshotInterval = 24;
     private int                             snapshotExpire   = 360;
     private int                             snapshotExpire   = 360;
     private ScheduledFuture<?>              scheduleSnapshotFuture;
     private ScheduledFuture<?>              scheduleSnapshotFuture;
-    
+
     public DatabaseTableMeta(){
     public DatabaseTableMeta(){
 
 
     }
     }
@@ -207,8 +207,8 @@ public class DatabaseTableMeta implements TableMetaTSDB {
                 packet = connection.query("show full tables from `" + schema + "` where Table_type = 'BASE TABLE'");
                 packet = connection.query("show full tables from `" + schema + "` where Table_type = 'BASE TABLE'");
                 List<String> tables = new ArrayList<String>();
                 List<String> tables = new ArrayList<String>();
                 for (String table : packet.getFieldValues()) {
                 for (String table : packet.getFieldValues()) {
-                    if("BASE TABLE".equalsIgnoreCase(table)){
-                       continue; 
+                    if ("BASE TABLE".equalsIgnoreCase(table)) {
+                        continue;
                     }
                     }
                     String fullName = schema + "." + table;
                     String fullName = schema + "." + table;
                     if (blackFilter == null || !blackFilter.filter(fullName)) {
                     if (blackFilter == null || !blackFilter.filter(fullName)) {
@@ -310,11 +310,19 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         boolean compareAll = true;
         boolean compareAll = true;
         for (Schema schema : tmpMemoryTableMeta.getRepository().getSchemas()) {
         for (Schema schema : tmpMemoryTableMeta.getRepository().getSchemas()) {
             for (String table : schema.showTables()) {
             for (String table : schema.showTables()) {
-                if (!compareTableMetaDbAndMemory(connection, tmpMemoryTableMeta, schema.getName(), table)) {
-                    compareAll = false;
+                String fullName = schema + "." + table;
+                if (blackFilter == null || !blackFilter.filter(fullName)) {
+                    if (filter == null || filter.filter(fullName)) {
+                        // issue : https://github.com/alibaba/canal/issues/1168
+                        // 在生成snapshot时重新过滤一遍
+                        if (!compareTableMetaDbAndMemory(connection, tmpMemoryTableMeta, schema.getName(), table)) {
+                            compareAll = false;
+                        }
+                    }
                 }
                 }
             }
             }
         }
         }
+
         if (compareAll) {
         if (compareAll) {
             Map<String, String> content = new HashMap<String, String>();
             Map<String, String> content = new HashMap<String, String>();
             content.put("destination", destination);
             content.put("destination", destination);
@@ -513,7 +521,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
             String sign = sourceField.isUnsigned() ? "unsigned" : "signed";
             String sign = sourceField.isUnsigned() ? "unsigned" : "signed";
             String sourceColumnType = StringUtils.removeEndIgnoreCase(sourceField.getColumnType(), sign).trim();
             String sourceColumnType = StringUtils.removeEndIgnoreCase(sourceField.getColumnType(), sign).trim();
             String targetColumnType = StringUtils.removeEndIgnoreCase(targetField.getColumnType(), sign).trim();
             String targetColumnType = StringUtils.removeEndIgnoreCase(targetField.getColumnType(), sign).trim();
-            
+
             boolean columnTypeCompare = false;
             boolean columnTypeCompare = false;
             columnTypeCompare |= StringUtils.containsIgnoreCase(sourceColumnType, targetColumnType);
             columnTypeCompare |= StringUtils.containsIgnoreCase(sourceColumnType, targetColumnType);
             columnTypeCompare |= StringUtils.containsIgnoreCase(targetColumnType, sourceColumnType);
             columnTypeCompare |= StringUtils.containsIgnoreCase(targetColumnType, sourceColumnType);

+ 10 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.parse;
 package com.alibaba.otter.canal.parse;
 
 
+import static com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.MASTER_HEARTBEAT_PERIOD_SECONDS;
+
 import java.io.IOException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.io.UnsupportedEncodingException;
@@ -7,6 +9,7 @@ import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.nio.charset.Charset;
 import java.util.BitSet;
 import java.util.BitSet;
 import java.util.List;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.junit.Assert;
 import org.junit.Assert;
@@ -213,6 +216,13 @@ public class DirectLogFetcherTest {
         } catch (Exception e) {
         } catch (Exception e) {
             logger.warn("update mariadb_slave_capability failed", e);
             logger.warn("update mariadb_slave_capability failed", e);
         }
         }
+
+        try {
+            long period = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS);
+            update("SET @master_heartbeat_period=" + period, connector);
+        } catch (Exception e) {
+            logger.warn("update master_heartbeat_period failed", e);
+        }
     }
     }
 
 
     private void loadBinlogChecksum(MysqlConnector connector) {
     private void loadBinlogChecksum(MysqlConnector connector) {

+ 1 - 3
pom.xml

@@ -4,7 +4,7 @@
     <artifactId>canal</artifactId>
     <artifactId>canal</artifactId>
     <packaging>pom</packaging>
     <packaging>pom</packaging>
     <name>canal module for otter ${project.version}</name>
     <name>canal module for otter ${project.version}</name>
-    <version>1.1.2-SNAPSHOT</version>
+    <version>1.1.3-SNAPSHOT</version>
     <url>https://github.com/alibaba/canal</url>
     <url>https://github.com/alibaba/canal</url>
     <parent>
     <parent>
         <groupId>org.sonatype.oss</groupId>
         <groupId>org.sonatype.oss</groupId>
@@ -381,7 +381,6 @@
                 </configuration>
                 </configuration>
             </plugin>
             </plugin>
             <!-- javadoc -->
             <!-- javadoc -->
-            <!--
             <plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-javadoc-plugin</artifactId>
                 <artifactId>maven-javadoc-plugin</artifactId>
@@ -400,7 +399,6 @@
                   <additionalparam>-Xdoclint:none</additionalparam>
                   <additionalparam>-Xdoclint:none</additionalparam>
                 </configuration>
                 </configuration>
             </plugin>
             </plugin>
-            -->
         </plugins>
         </plugins>
         <sourceDirectory>src/main/java</sourceDirectory>
         <sourceDirectory>src/main/java</sourceDirectory>
         <testSourceDirectory>src/test/java</testSourceDirectory>
         <testSourceDirectory>src/test/java</testSourceDirectory>

+ 2 - 2
prometheus/pom.xml

@@ -3,13 +3,13 @@
     <parent>
     <parent>
         <artifactId>canal</artifactId>
         <artifactId>canal</artifactId>
         <groupId>com.alibaba.otter</groupId>
         <groupId>com.alibaba.otter</groupId>
-        <version>1.1.2-SNAPSHOT</version>
+        <version>1.1.3-SNAPSHOT</version>
     </parent>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <modelVersion>4.0.0</modelVersion>
 
 
     <groupId>com.alibaba.otter</groupId>
     <groupId>com.alibaba.otter</groupId>
     <artifactId>canal.prometheus</artifactId>
     <artifactId>canal.prometheus</artifactId>
-    <version>1.1.2-SNAPSHOT</version>
+    <version>1.1.3-SNAPSHOT</version>
     <name>canal prometheus module for otter ${project.version}</name>
     <name>canal prometheus module for otter ${project.version}</name>
     <dependencies>
     <dependencies>
         <dependency>
         <dependency>

+ 1 - 1
protocol/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
server/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<artifactId>canal.server</artifactId>
 	<artifactId>canal.server</artifactId>

+ 8 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -14,6 +14,7 @@ public class MQProperties {
     private int     retries                = 0;
     private int     retries                = 0;
     private int     batchSize              = 16384;
     private int     batchSize              = 16384;
     private int     lingerMs               = 1;
     private int     lingerMs               = 1;
+    private int     maxRequestSize         = 1048576;
     private long    bufferMemory           = 33554432L;
     private long    bufferMemory           = 33554432L;
     private boolean filterTransactionEntry = true;
     private boolean filterTransactionEntry = true;
     private String  producerGroup          = "Canal-Producer";
     private String  producerGroup          = "Canal-Producer";
@@ -185,4 +186,11 @@ public class MQProperties {
     public void setAliyunSecretKey(String aliyunSecretKey) {
     public void setAliyunSecretKey(String aliyunSecretKey) {
         this.aliyunSecretKey = aliyunSecretKey;
         this.aliyunSecretKey = aliyunSecretKey;
     }
     }
+    public int getMaxRequestSize() {
+        return maxRequestSize;
+    }
+
+    public void setMaxRequestSize(int maxRequestSize) {
+        this.maxRequestSize = maxRequestSize;
+    }
 }
 }

+ 1 - 0
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -43,6 +43,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
         properties.put("retries", kafkaProperties.getRetries());
         properties.put("retries", kafkaProperties.getRetries());
         properties.put("batch.size", kafkaProperties.getBatchSize());
         properties.put("batch.size", kafkaProperties.getBatchSize());
         properties.put("linger.ms", kafkaProperties.getLingerMs());
         properties.put("linger.ms", kafkaProperties.getLingerMs());
+        properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
         properties.put("buffer.memory", kafkaProperties.getBufferMemory());
         properties.put("buffer.memory", kafkaProperties.getBufferMemory());
         properties.put("key.serializer", StringSerializer.class.getName());
         properties.put("key.serializer", StringSerializer.class.getName());
         if (!kafkaProperties.getFlatMessage()) {
         if (!kafkaProperties.getFlatMessage()) {

+ 1 - 2
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -13,7 +13,6 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.instance.core.CanalInstance;
 import com.alibaba.otter.canal.instance.core.CanalInstance;
 import com.alibaba.otter.canal.instance.core.CanalMQConfig;
 import com.alibaba.otter.canal.instance.core.CanalMQConfig;
-import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
@@ -146,7 +145,7 @@ public class CanalMQStarter {
                     try {
                     try {
                         int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                         int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                         if (batchId != -1 && size != 0) {
                         if (batchId != -1 && size != 0) {
-                            canalMQProducer.send(destination, message, new CanalKafkaProducer.Callback() {
+                            canalMQProducer.send(destination, message, new CanalMQProducer.Callback() {
 
 
                                 @Override
                                 @Override
                                 public void commit() {
                                 public void commit() {

+ 1 - 1
sink/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
store/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
 		<artifactId>canal</artifactId>
-		<version>1.1.2-SNAPSHOT</version>
+		<version>1.1.3-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>
 	<groupId>com.alibaba.otter</groupId>

+ 2 - 2
store/src/main/java/com/alibaba/otter/canal/store/model/Event.java

@@ -71,8 +71,8 @@ public class Event implements Serializable {
             this.rawLength = rawEntry.size();
             this.rawLength = rawEntry.size();
         } else {
         } else {
             this.entry = entry;
             this.entry = entry;
-            // 按照3倍的event length预估
-            this.rawLength = entry.getHeader().getEventLength() * 3;
+            // 按照6倍的event length预估
+            this.rawLength = entry.getHeader().getEventLength() * 6;
         }
         }
     }
     }