Browse Source

Merge pull request #1039 from rewerma/master

规范注释
rewerma 6 years ago
parent
commit
f7185a618b
29 changed files with 231 additions and 133 deletions
  1. 1 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java
  2. 15 2
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/AdapterConfigs.java
  3. 15 26
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  4. 16 10
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/DatasourceConfig.java
  5. 10 10
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java
  6. 11 4
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/EtlResult.java
  7. 28 39
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java
  8. 2 2
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java
  9. 1 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java
  10. 1 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/OuterAdapterConfig.java
  11. 7 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java
  12. 6 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/SPI.java
  13. 22 25
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java
  14. 1 1
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java
  15. 32 0
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java
  16. 1 1
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java
  17. 6 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java
  18. 6 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/EtlLock.java
  19. 9 4
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java
  20. 6 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java
  21. 7 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfig.java
  22. 6 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/CuratorClient.java
  23. 6 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/SpringContext.java
  24. 1 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  25. 1 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  26. 1 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  27. 6 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java
  28. 1 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
  29. 6 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

+ 1 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java

@@ -11,7 +11,7 @@ import com.alibaba.otter.canal.client.adapter.support.SPI;
 /**
  * 外部适配器接口
  *
- * @author machengyuan 2018-8-18 下午10:14:02
+ * @author reweerma 2018-8-18 下午10:14:02
  * @version 1.0.0
  */
 @SPI("logger")

+ 15 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/AdapterConfigs.java

@@ -1,13 +1,26 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
+import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
+/**
+ * 适配器配置集合, 用于配置加载, 线程不安全
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 public class AdapterConfigs {
 
-    private static Map<String, Set<String>> configs = new ConcurrentHashMap<>();
+    /**
+     * 类型下对应所有配置名, 如:
+     * hbase
+     *  ┗━ mytest_person.yml
+     *  ┗━ mytest_role.yml
+     *  ┗━ mytest_department.yml
+     */
+    private static Map<String, Set<String>> configs = new HashMap<>();
 
     public static void put(String key, String value) {
         Set<String> values = configs.get(key);

+ 15 - 26
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -2,29 +2,26 @@ package com.alibaba.otter.canal.client.adapter.support;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 
 /**
  * 配置信息类
  *
- * @author machengyuan 2018-8-18 下午10:40:12
+ * @author rewerma 2018-8-18 下午10:40:12
  * @version 1.0.0
  */
 public class CanalClientConfig {
 
-    private String              canalServerHost;
+    private String              canalServerHost;    // 单机模式下canal server的 ip:port
 
-    private String              zookeeperHosts;
+    private String              zookeeperHosts;     // 集群模式下的zk地址, 如果配置了单机地址则以单机为准!!
 
-    private Properties          properties;
+    private String              bootstrapServers;   // kafka or rocket mq 地址
 
-    private String              bootstrapServers;
+    private Boolean             flatMessage = true; // 是否已flatMessage模式传输, 只适用于mq模式
 
-    private List<MQTopic>       mqTopics;
+    private List<MQTopic>       mqTopics;           // mq topic 列表
 
-    private Boolean             flatMessage = true;
-
-    private List<CanalInstance> canalInstances;
+    private List<CanalInstance> canalInstances;     // tcp 模式下 canal 实例列表, 与mq模式不能共存!!
 
     public String getCanalServerHost() {
         return canalServerHost;
@@ -42,14 +39,6 @@ public class CanalClientConfig {
         this.zookeeperHosts = zookeeperHosts;
     }
 
-    public Properties getProperties() {
-        return properties;
-    }
-
-    public void setProperties(Properties properties) {
-        this.properties = properties;
-    }
-
     public String getBootstrapServers() {
         return bootstrapServers;
     }
@@ -84,9 +73,9 @@ public class CanalClientConfig {
 
     public static class CanalInstance {
 
-        private String             instance;
+        private String             instance;      // 实例名
 
-        private List<AdapterGroup> adapterGroups;
+        private List<AdapterGroup> adapterGroups; // 适配器分组列表
 
         public String getInstance() {
             return instance;
@@ -109,7 +98,7 @@ public class CanalClientConfig {
 
     public static class AdapterGroup {
 
-        private List<OuterAdapterConfig> outAdapters;
+        private List<OuterAdapterConfig> outAdapters; // 适配器列表
 
         public List<OuterAdapterConfig> getOutAdapters() {
             return outAdapters;
@@ -122,11 +111,11 @@ public class CanalClientConfig {
 
     public static class MQTopic {
 
-        private String      mqMode;
+        private String      mqMode;                     // mq模式 kafka or rocketMQ
 
-        private String      topic;
+        private String      topic;                      // topic名
 
-        private List<Group> groups = new ArrayList<>();
+        private List<Group> groups = new ArrayList<>(); // 分组列表
 
         public String getMqMode() {
             return mqMode;
@@ -155,11 +144,11 @@ public class CanalClientConfig {
 
     public static class Group {
 
-        private String                   groupId;
+        private String                   groupId;     // group id
 
         // private List<Adaptor> adapters = new ArrayList<>();
 
-        private List<OuterAdapterConfig> outAdapters;
+        private List<OuterAdapterConfig> outAdapters; // 适配器配置列表
 
         public String getGroupId() {
             return groupId;

+ 16 - 10
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/DatasourceConfig.java

@@ -1,21 +1,27 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import com.alibaba.druid.pool.DruidDataSource;
-
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.alibaba.druid.pool.DruidDataSource;
+
+/**
+ * 数据源配置
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 public class DatasourceConfig {
 
-    public final static Map<String, DruidDataSource> DATA_SOURCES = new ConcurrentHashMap<>();
+    public final static Map<String, DruidDataSource> DATA_SOURCES = new ConcurrentHashMap<>(); // key对应的数据源
 
-    private String                                   driver       = "com.mysql.jdbc.Driver";
-    private String                                   url;
-    private String                                   database;
-    private String                                   type         = "mysql";
-    private String                                   username;
-    private String                                   password;
-    private Integer                                  maxActive    = 3;
+    private String                                   driver       = "com.mysql.jdbc.Driver";   // 默认为mysql jdbc驱动
+    private String                                   url;                                      // jdbc url
+    private String                                   database;                                 // jdbc database
+    private String                                   type         = "mysql";                   // 类型, 默认为mysql
+    private String                                   username;                                 // jdbc username
+    private String                                   password;                                 // jdbc password
+    private Integer                                  maxActive    = 3;                         // 连接池最大连接数,默认为3
 
     public String getDriver() {
         return driver;

+ 10 - 10
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java

@@ -7,24 +7,24 @@ import java.util.Map;
 /**
  * DML操作转换对象
  *
- * @author machengyuan 2018-8-19 下午11:30:49
+ * @author rewerma 2018-8-19 下午11:30:49
  * @version 1.0.0
  */
 public class Dml implements Serializable {
 
     private static final long         serialVersionUID = 2611556444074013268L;
 
-    private String                    destination;
-    private String                    database;
-    private String                    table;
-    private String                    type;
+    private String                    destination;                            // 对应canal的实例或者MQ的topic
+    private String                    database;                               // 数据库或schema
+    private String                    table;                                  // 表名
+    private String                    type;                                   // 类型: INSERT UPDATE DELETE
     // binlog executeTime
-    private Long                      es;
+    private Long                      es;                                     // 执行耗时
     // dml build timeStamp
-    private Long                      ts;
-    private String                    sql;
-    private List<Map<String, Object>> data;
-    private List<Map<String, Object>> old;
+    private Long                      ts;                                     // 同步时间
+    private String                    sql;                                    // 执行的sql, dml sql为空
+    private List<Map<String, Object>> data;                                   // 数据列表
+    private List<Map<String, Object>> old;                                    // 旧数据列表, 用于update, size和data的size一一对应
 
     public String getDestination() {
         return destination;

+ 11 - 4
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/EtlResult.java

@@ -2,14 +2,21 @@ package com.alibaba.otter.canal.client.adapter.support;
 
 import java.io.Serializable;
 
+/**
+ * ETL的结果对象
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 public class EtlResult implements Serializable {
+
     private static final long serialVersionUID = 4250522736289866505L;
 
-    private boolean succeeded = false;
+    private boolean           succeeded        = false;
 
-    private String resultMessage;
+    private String            resultMessage;
 
-    private String errorMessage;
+    private String            errorMessage;
 
     public boolean getSucceeded() {
         return succeeded;
@@ -34,4 +41,4 @@ public class EtlResult implements Serializable {
     public void setErrorMessage(String errorMessage) {
         this.errorMessage = errorMessage;
     }
-}
+}

+ 28 - 39
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java

@@ -1,22 +1,11 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.*;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Pattern;
@@ -27,12 +16,13 @@ import org.slf4j.LoggerFactory;
 /**
  * SPI 类加载器
  *
- * @author machengyuan 2018-8-19 下午11:30:49
+ * @author rewerma 2018-8-19 下午11:30:49
  * @version 1.0.0
  */
 public class ExtensionLoader<T> {
 
-    private static final Logger                                      logger                     = LoggerFactory.getLogger(ExtensionLoader.class);
+    private static final Logger                                      logger                     = LoggerFactory
+        .getLogger(ExtensionLoader.class);
 
     private static final String                                      SERVICES_DIRECTORY         = "META-INF/services/";
 
@@ -40,7 +30,8 @@ public class ExtensionLoader<T> {
 
     private static final String                                      DEFAULT_CLASSLOADER_POLICY = "internal";
 
-    private static final Pattern                                     NAME_SEPARATOR             = Pattern.compile("\\s*[,]+\\s*");
+    private static final Pattern                                     NAME_SEPARATOR             = Pattern
+        .compile("\\s*[,]+\\s*");
 
     private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS          = new ConcurrentHashMap<>();
 
@@ -271,7 +262,8 @@ public class ExtensionLoader<T> {
             return instance;
         } catch (Throwable t) {
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
-                                            + ")  could not be instantiated: " + t.getMessage(), t);
+                                            + ")  could not be instantiated: " + t.getMessage(),
+                t);
         }
     }
 
@@ -279,8 +271,8 @@ public class ExtensionLoader<T> {
         if (type == null) throw new IllegalArgumentException("Extension type == null");
         if (name == null) throw new IllegalArgumentException("Extension name == null");
         Class<?> clazz = getExtensionClasses().get(name);
-        if (clazz == null) throw new IllegalStateException("No such extension \"" + name + "\" for " + type.getName()
-                                                           + "!");
+        if (clazz == null)
+            throw new IllegalStateException("No such extension \"" + name + "\" for " + type.getName() + "!");
         return clazz;
     }
 
@@ -342,8 +334,8 @@ public class ExtensionLoader<T> {
         logger.info("extension classpath dir: " + dir);
         File externalLibDir = new File(dir);
         if (!externalLibDir.exists()) {
-            externalLibDir = new File(File.separator + this.getJarDirectoryPath() + File.separator + "canal_client"
-                                      + File.separator + "lib");
+            externalLibDir = new File(
+                File.separator + this.getJarDirectoryPath() + File.separator + "canal_client" + File.separator + "lib");
         }
         if (externalLibDir.exists()) {
             File[] files = externalLibDir.listFiles(new FilenameFilter() {
@@ -495,12 +487,10 @@ public class ExtensionLoader<T> {
                                             // Class.forName(line, true,
                                             // classLoader);
                                             if (!type.isAssignableFrom(clazz)) {
-                                                throw new IllegalStateException("Error when load extension class(interface: "
-                                                                                + type
-                                                                                + ", class line: "
-                                                                                + clazz.getName()
-                                                                                + "), class "
-                                                                                + clazz.getName()
+                                                throw new IllegalStateException(
+                                                    "Error when load extension class(interface: " + type
+                                                                                + ", class line: " + clazz.getName()
+                                                                                + "), class " + clazz.getName()
                                                                                 + "is not subtype of interface.");
                                             } else {
                                                 try {
@@ -518,9 +508,9 @@ public class ExtensionLoader<T> {
                                                                 extensionClasses.put(n, clazz);
                                                             } else if (c != clazz) {
                                                                 cachedNames.remove(clazz);
-                                                                throw new IllegalStateException("Duplicate extension "
-                                                                                                + type.getName()
-                                                                                                + " name " + n + " on "
+                                                                throw new IllegalStateException(
+                                                                    "Duplicate extension " + type.getName() + " name "
+                                                                                                + n + " on "
                                                                                                 + c.getName() + " and "
                                                                                                 + clazz.getName());
                                                             }
@@ -530,12 +520,9 @@ public class ExtensionLoader<T> {
                                             }
                                         }
                                     } catch (Throwable t) {
-                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: "
-                                                                                            + type
-                                                                                            + ", class line: "
-                                                                                            + line
-                                                                                            + ") in "
-                                                                                            + url
+                                        IllegalStateException e = new IllegalStateException(
+                                            "Failed to load extension class(interface: " + type + ", class line: "
+                                                                                            + line + ") in " + url
                                                                                             + ", cause: "
                                                                                             + t.getMessage(),
                                             t);
@@ -550,13 +537,15 @@ public class ExtensionLoader<T> {
                         }
                     } catch (Throwable t) {
                         logger.error("Exception when load extension class(interface: " + type + ", class file: " + url
-                                     + ") in " + url, t);
+                                     + ") in " + url,
+                            t);
                     }
                 } // end of while urls
             }
         } catch (Throwable t) {
-            logger.error("Exception when load extension class(interface: " + type + ", description file: " + fileName
-                         + ").", t);
+            logger.error(
+                "Exception when load extension class(interface: " + type + ", description file: " + fileName + ").",
+                t);
         }
     }
 

+ 2 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java

@@ -11,7 +11,7 @@ import org.slf4j.LoggerFactory;
 /**
  * 类型转换工具类
  *
- * @author machengyuan 2018-8-19 下午06:14:23
+ * @author rewerma 2018-8-19 下午06:14:23
  * @version 1.0.0
  */
 public class JdbcTypeUtil {
@@ -30,7 +30,7 @@ public class JdbcTypeUtil {
         switch (jdbcType) {
             case Types.BIT:
             case Types.BOOLEAN:
-//                return Boolean.class;
+                // return Boolean.class;
             case Types.TINYINT:
                 return Byte.TYPE;
             case Types.SMALLINT:

+ 1 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -9,7 +9,7 @@ import com.alibaba.otter.canal.protocol.Message;
 /**
  * Message对象解析工具类
  *
- * @author machengyuan 2018-8-19 下午06:14:23
+ * @author rewerma 2018-8-19 下午06:14:23
  * @version 1.0.0
  */
 public class MessageUtil {

+ 1 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/OuterAdapterConfig.java

@@ -5,7 +5,7 @@ import java.util.Map;
 /**
  * 外部适配器配置信息类
  *
- * @author machengyuan 2018-8-18 下午10:15:12
+ * @author rewerma 2018-8-18 下午10:15:12
  * @version 1.0.0
  */
 public class OuterAdapterConfig {

+ 7 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java

@@ -3,9 +3,15 @@ package com.alibaba.otter.canal.client.adapter.support;
 import java.io.Serializable;
 import java.util.Date;
 
+/**
+ * 用于rest的结果返回对象
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 public class Result implements Serializable {
 
-    public Integer code    = 20000;
+    public Integer code = 20000;
     public Object  data;
     public String  message;
     public Date    sysTime;

+ 6 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/SPI.java

@@ -6,6 +6,12 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
+/**
+ * SPI装载器注解
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 @Documented
 @Retention(RetentionPolicy.RUNTIME)
 @Target({ ElementType.TYPE })

+ 22 - 25
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java

@@ -1,21 +1,18 @@
 package com.alibaba.otter.canal.client.adapter.hbase.config;
 
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.*;
 
 /**
  * HBase表映射配置
  *
- * @author machengyuan 2018-8-21 下午06:45:49
+ * @author rewerma 2018-8-21 下午06:45:49
  * @version 1.0.0
  */
 public class MappingConfig {
-    private String dataSourceKey;
 
-    private HbaseOrm hbaseOrm;
+    private String   dataSourceKey; // 数据源key
+
+    private HbaseOrm hbaseOrm;      // hbase映射配置
 
     public String getDataSourceKey() {
         return dataSourceKey;
@@ -130,7 +127,7 @@ public class MappingConfig {
     }
 
     public enum Mode {
-        STRING("STRING"), NATIVE("NATIVE"), PHOENIX("PHOENIX");
+                      STRING("STRING"), NATIVE("NATIVE"), PHOENIX("PHOENIX");
 
         private String type;
 
@@ -145,23 +142,23 @@ public class MappingConfig {
 
     public static class HbaseOrm {
 
-        private Mode                    mode               = Mode.STRING;
-        private String                  destination;
-        private String                  database;
-        private String                  table;
-        private String                  hbaseTable;
-        private String                  family             = "CF";
-        private boolean                 uppercaseQualifier = true;
-        private boolean                 autoCreateTable    = false;                // 同步时HBase中表不存在的情况下自动建表
-        private String                  rowKey;                                    // 指定复合主键为rowKey
-        private Map<String, String>     columns;
-        private ColumnItem              rowKeyColumn;
-        private String                  etlCondition;
-
-        private Map<String, ColumnItem> columnItems        = new LinkedHashMap<>();
-        private Set<String>             families           = new LinkedHashSet<>();
+        private Mode                    mode               = Mode.STRING;           // hbase默认转换格式
+        private String                  destination;                                // canal实例或MQ的topic
+        private String                  database;                                   // 数据库名或schema名
+        private String                  table;                                      // 表面名
+        private String                  hbaseTable;                                 // hbase表名
+        private String                  family             = "CF";                  // 默认统一column family
+        private boolean                 uppercaseQualifier = true;                  // 是否转大写
+        private boolean                 autoCreateTable    = false;                 // 同步时HBase中表不存在的情况下自动建表
+        private String                  rowKey;                                     // 指定复合主键为rowKey
+        private Map<String, String>     columns;                                    // 字段映射
+        private ColumnItem              rowKeyColumn;                               // rowKey字段
+        private String                  etlCondition;                               // etl条件sql
+
+        private Map<String, ColumnItem> columnItems        = new LinkedHashMap<>(); // 转换后的字段映射列表
+        private Set<String>             families           = new LinkedHashSet<>(); // column family列表
         private int                     readBatch          = 5000;
-        private int                     commitBatch        = 5000;
+        private int                     commitBatch        = 5000;                  // etl等批量提交大小
 
         public Mode getMode() {
             return mode;

+ 1 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -18,7 +18,7 @@ import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
 /**
  * HBase表映射配置加载器
  *
- * @author machengyuan 2018-8-21 下午06:45:49
+ * @author rewerma 2018-8-21 下午06:45:49
  * @version 1.0.0
  */
 public class MappingConfigLoader {

+ 32 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -24,6 +24,12 @@ import com.alibaba.otter.canal.client.adapter.support.EtlResult;
 import com.alibaba.otter.canal.client.adapter.support.JdbcTypeUtil;
 import com.google.common.base.Joiner;
 
+/**
+ * HBase ETL 操作业务类
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 public class HbaseEtlService {
 
     private static Logger logger = LoggerFactory.getLogger(HbaseEtlService.class);
@@ -66,6 +72,12 @@ public class HbaseEtlService {
         }
     }
 
+    /**
+     * 建表
+     * 
+     * @param hbaseTemplate
+     * @param config
+     */
     public static void createTable(HbaseTemplate hbaseTemplate, MappingConfig config) {
         try {
             // 判断hbase表是否存在,不存在则建表
@@ -79,6 +91,15 @@ public class HbaseEtlService {
         }
     }
 
+    /**
+     * 导入数据
+     * 
+     * @param ds 数据源
+     * @param hbaseTemplate hbaseTemplate
+     * @param config 配置
+     * @param params 筛选条件
+     * @return 导入结果
+     */
     public static EtlResult importData(DataSource ds, HbaseTemplate hbaseTemplate, MappingConfig config,
                                        List<String> params) {
         EtlResult etlResult = new EtlResult();
@@ -209,6 +230,17 @@ public class HbaseEtlService {
         return etlResult;
     }
 
+    /**
+     * 执行导入
+     * 
+     * @param ds
+     * @param sql
+     * @param hbaseOrm
+     * @param hbaseTemplate
+     * @param successCount
+     * @param errMsg
+     * @return
+     */
     private static boolean executeSqlImport(DataSource ds, String sql, MappingConfig.HbaseOrm hbaseOrm,
                                             HbaseTemplate hbaseTemplate, AtomicLong successCount, List<String> errMsg) {
         try {

+ 1 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -13,7 +13,7 @@ import com.alibaba.otter.canal.client.adapter.support.Dml;
 /**
  * HBase同步操作业务
  *
- * @author machengyuan 2018-8-21 下午06:45:49
+ * @author rewerma 2018-8-21 下午06:45:49
  * @version 1.0.0
  */
 public class HbaseSyncService {

+ 6 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java

@@ -3,6 +3,12 @@ package com.alibaba.otter.canal.adapter.launcher;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.builder.SpringApplicationBuilder;
 
+/**
+ * 启动入口
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 @SpringBootApplication
 public class CanalAdapterApplication {
 

+ 6 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/EtlLock.java

@@ -14,6 +14,12 @@ import org.springframework.stereotype.Component;
 
 import com.alibaba.otter.canal.adapter.launcher.config.CuratorClient;
 
+/**
+ * Etl 同步锁
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 @Component
 public class EtlLock {
 

+ 9 - 4
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java

@@ -19,16 +19,22 @@ import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
 import com.alibaba.otter.canal.adapter.launcher.config.CuratorClient;
 import com.alibaba.otter.canal.common.utils.BooleanMutex;
 
+/**
+ * 同步开关
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 @Component
 public class SyncSwitch {
 
     private static final String                    SYN_SWITCH_ZK_NODE = "/sync-switch/";
 
-    private static final Map<String, BooleanMutex> LOCAL_LOCK        = new ConcurrentHashMap<>();
+    private static final Map<String, BooleanMutex> LOCAL_LOCK         = new ConcurrentHashMap<>();
 
-    private static final Map<String, BooleanMutex> DISTRIBUTED_LOCK  = new ConcurrentHashMap<>();
+    private static final Map<String, BooleanMutex> DISTRIBUTED_LOCK   = new ConcurrentHashMap<>();
 
-    private static Mode                            mode              = Mode.LOCAL;
+    private static Mode                            mode               = Mode.LOCAL;
 
     @Resource
     private AdapterCanalConfig                     adapterCanalConfig;
@@ -204,5 +210,4 @@ public class SyncSwitch {
         }
     }
 
-
 }

+ 6 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java

@@ -9,6 +9,12 @@ import org.springframework.stereotype.Component;
 
 import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 
+/**
+ * canal 的相关配置类
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 @Component
 @ConfigurationProperties(prefix = "canal.conf")
 public class AdapterCanalConfig extends CanalClientConfig {

+ 7 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfig.java

@@ -13,6 +13,12 @@ import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 
+/**
+ * 适配器数据源及配置文件列表配置类
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 @Component
 @ConfigurationProperties(prefix = "adapter.conf")
 public class AdapterConfig {
@@ -55,7 +61,7 @@ public class AdapterConfig {
                 try {
                     ds.init();
                 } catch (SQLException e) {
-                    logger.error("#Failed to initial datasource: " + datasourceConfig.getUrl(), e);
+                    logger.error("ERROR ## failed to initial datasource: " + datasourceConfig.getUrl(), e);
                 }
                 DatasourceConfig.DATA_SOURCES.put(entry.getKey(), ds);
             }

+ 6 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/CuratorClient.java

@@ -8,6 +8,12 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.springframework.stereotype.Component;
 
+/**
+ * curator 配置类
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 @Component
 public class CuratorClient {
 

+ 6 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/SpringContext.java

@@ -5,6 +5,12 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
 import org.springframework.stereotype.Component;
 
+/**
+ * spring util配置类
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 @Component
 public class SpringContext implements ApplicationContextAware {
 

+ 1 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -21,7 +21,7 @@ import com.alibaba.otter.canal.protocol.Message;
 /**
  * 适配器工作线程抽象类
  *
- * @author machengyuan 2018-8-19 下午11:30:49
+ * @author rewerma 2018-8-19 下午11:30:49
  * @version 1.0.0
  */
 public abstract class AbstractCanalAdapterWorker {

+ 1 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -17,7 +17,7 @@ import com.alibaba.otter.canal.protocol.Message;
 /**
  * kafka对应的client适配器工作线程
  *
- * @author machengyuan 2018-8-19 下午11:30:49
+ * @author rewerma 2018-8-19 下午11:30:49
  * @version 1.0.0
  */
 public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {

+ 1 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -16,7 +16,7 @@ import com.alibaba.otter.canal.protocol.Message;
 /**
  * kafka对应的client适配器工作线程
  *
- * @author machengyuan 2018-8-19 下午11:30:49
+ * @author rewerma 2018-8-19 下午11:30:49
  * @version 1.0.0
  */
 public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {

+ 6 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

@@ -15,6 +15,12 @@ import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
 import com.alibaba.otter.canal.adapter.launcher.config.AdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 
+/**
+ * 适配器启动业务类
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 @Component
 public class CanalAdapterService {
 

+ 1 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -15,7 +15,7 @@ import com.alibaba.otter.canal.protocol.Message;
 /**
  * 原生canal-server对应的client适配器工作线程
  *
- * @author machengyuan 2018-8-19 下午11:30:49
+ * @author rewrema 2018-8-19 下午11:30:49
  * @version 1.0.0
  */
 public class CanalAdapterWorker extends AbstractCanalAdapterWorker {

+ 6 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -17,6 +17,12 @@ import com.alibaba.otter.canal.client.adapter.support.EtlResult;
 import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
 import com.alibaba.otter.canal.client.adapter.support.Result;
 
+/**
+ * 适配器操作Rest
+ *
+ * @author rewerma @ 2018-10-20
+ * @version 1.0.0
+ */
 @RestController
 public class CommonRest {