Browse Source

Merge branch 'master' into feature/mcy

mcy 6 years ago
parent
commit
572ea9f27a
19 changed files with 499 additions and 278 deletions
  1. 10 143
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java
  2. 2 2
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java
  3. 4 3
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  4. 19 1
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java
  5. 1 1
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java
  6. 55 12
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java
  7. 39 7
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java
  8. 7 7
      client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml
  9. 69 81
      client-adapter/launcher/pom.xml
  10. 62 0
      client-adapter/launcher/src/main/assembly/dev.xml
  11. 47 10
      client-adapter/launcher/src/main/assembly/release.xml
  12. 5 0
      client-adapter/launcher/src/main/bin/restart.sh
  13. 22 0
      client-adapter/launcher/src/main/bin/startup.bat
  14. 68 0
      client-adapter/launcher/src/main/bin/startup.sh
  15. 65 0
      client-adapter/launcher/src/main/bin/stop.sh
  16. 20 8
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java
  17. 1 1
      client-adapter/launcher/src/main/resources/application.yml
  18. 1 0
      client/pom.xml
  19. 2 2
      example/src/main/java/com/alibaba/otter/canal/example/db/CanalConnectorClient.java

+ 10 - 143
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java

@@ -1,6 +1,9 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -83,47 +86,6 @@ public class ExtensionLoader<T> {
         this.classLoaderPolicy = classLoaderPolicy;
     }
 
-    public String getExtensionName(T extensionInstance) {
-        return getExtensionName(extensionInstance.getClass());
-    }
-
-    public String getExtensionName(Class<?> extensionClass) {
-        return cachedNames.get(extensionClass);
-    }
-
-    public ConcurrentHashMap<String, IllegalStateException> getExceptions() {
-        return exceptions;
-    }
-
-    /**
-     * 返回扩展点实例,如果没有指定的扩展点或是还没加载(即实例化)则返回<code>null</code>注意:此方法不会触发扩展点的加载
-     * <p/>
-     * 一般应该调用{@link #getExtension(String)}方法获得扩展,这个方法会触发扩展点加载
-     *
-     * @see #getExtension(String)
-     */
-    @SuppressWarnings("unchecked")
-    public T getLoadedExtension(String name) {
-        if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null");
-        Holder<Object> holder = cachedInstances.get(name);
-        if (holder == null) {
-            cachedInstances.putIfAbsent(name, new Holder<>());
-            holder = cachedInstances.get(name);
-        }
-        return (T) holder.get();
-    }
-
-    /**
-     * 返回已经加载的扩展点的名字
-     * <p/>
-     * 一般应该调用{@link #getSupportedExtensions()}方法获得扩展,这个方法会返回所有的扩展点
-     *
-     * @see #getSupportedExtensions()
-     */
-    public Set<String> getLoadedExtensions() {
-        return Collections.unmodifiableSet(new TreeSet<>(cachedInstances.keySet()));
-    }
-
     /**
      * 返回指定名字的扩展
      *
@@ -165,87 +127,6 @@ public class ExtensionLoader<T> {
         return getExtension(cachedDefaultName);
     }
 
-    public boolean hasExtension(String name) {
-        if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null");
-        try {
-            return getExtensionClass(name) != null;
-        } catch (Throwable t) {
-            return false;
-        }
-    }
-
-    public Set<String> getSupportedExtensions() {
-        Map<String, Class<?>> clazzes = getExtensionClasses();
-        return Collections.unmodifiableSet(new TreeSet<String>(clazzes.keySet()));
-    }
-
-    /**
-     * 返回缺省的扩展点名,如果没有设置缺省则返回<code>null</code>
-     */
-    public String getDefaultExtensionName() {
-        getExtensionClasses();
-        return cachedDefaultName;
-    }
-
-    /**
-     * 编程方式添加新扩展点
-     *
-     * @param name 扩展点名
-     * @param clazz 扩展点类
-     * @throws IllegalStateException 要添加扩展点名已经存在
-     */
-    public void addExtension(String name, Class<?> clazz) {
-        getExtensionClasses(); // load classes
-
-        if (!type.isAssignableFrom(clazz)) {
-            throw new IllegalStateException("Input type " + clazz + "not implement Extension " + type);
-        }
-        if (clazz.isInterface()) {
-            throw new IllegalStateException("Input type " + clazz + "can not be interface!");
-        }
-
-        if (name == null || "".equals(name)) {
-            throw new IllegalStateException("Extension name is blank (Extension " + type + ")!");
-        }
-        if (cachedClasses.get().containsKey(name)) {
-            throw new IllegalStateException("Extension name " + name + " already existed(Extension " + type + ")!");
-        }
-
-        cachedNames.put(clazz, name);
-        cachedClasses.get().put(name, clazz);
-    }
-
-    /**
-     * 编程方式添加替换已有扩展点
-     *
-     * @param name 扩展点名
-     * @param clazz 扩展点类
-     * @throws IllegalStateException 要添加扩展点名已经存在
-     * @deprecated 不推荐应用使用,一般只在测试时可以使用
-     */
-    @Deprecated
-    public void replaceExtension(String name, Class<?> clazz) {
-        getExtensionClasses(); // load classes
-
-        if (!type.isAssignableFrom(clazz)) {
-            throw new IllegalStateException("Input type " + clazz + "not implement Extension " + type);
-        }
-        if (clazz.isInterface()) {
-            throw new IllegalStateException("Input type " + clazz + "can not be interface!");
-        }
-
-        if (name == null || "".equals(name)) {
-            throw new IllegalStateException("Extension name is blank (Extension " + type + ")!");
-        }
-        if (!cachedClasses.get().containsKey(name)) {
-            throw new IllegalStateException("Extension name " + name + " not existed(Extension " + type + ")!");
-        }
-
-        cachedNames.put(clazz, name);
-        cachedClasses.get().put(name, clazz);
-        cachedInstances.remove(name);
-    }
-
     @SuppressWarnings("unchecked")
     private T createExtension(String name) {
         Class<?> clazz = getExtensionClasses().get(name);
@@ -267,15 +148,6 @@ public class ExtensionLoader<T> {
         }
     }
 
-    private Class<?> getExtensionClass(String name) {
-        if (type == null) throw new IllegalArgumentException("Extension type == null");
-        if (name == null) throw new IllegalArgumentException("Extension name == null");
-        Class<?> clazz = getExtensionClasses().get(name);
-        if (clazz == null)
-            throw new IllegalStateException("No such extension \"" + name + "\" for " + type.getName() + "!");
-        return clazz;
-    }
-
     private Map<String, Class<?>> getExtensionClasses() {
         Map<String, Class<?>> classes = cachedClasses.get();
         if (classes == null) {
@@ -334,23 +206,18 @@ public class ExtensionLoader<T> {
 
         // 1. lib folder,customized extension classLoader (jar_dir/lib)
         String dir = File.separator + this.getJarDirectoryPath() + File.separator + "lib";
-        logger.info("extension classpath dir: " + dir);
+
         File externalLibDir = new File(dir);
         if (!externalLibDir.exists()) {
-            externalLibDir = new File(
-                File.separator + this.getJarDirectoryPath() + File.separator + "canal_client" + File.separator + "lib");
+            externalLibDir = new File(File.separator + this.getJarDirectoryPath() + File.separator + "canal-adapter"
+                                      + File.separator + "lib");
         }
+        logger.info("extension classpath dir: " + externalLibDir.getAbsolutePath());
         if (externalLibDir.exists()) {
-            File[] files = externalLibDir.listFiles(new FilenameFilter() {
-
-                @Override
-                public boolean accept(File dir, String name) {
-                    return name.endsWith(".jar");
-                }
-            });
+            File[] files = externalLibDir.listFiles((dir1, name) -> name.endsWith(".jar"));
             if (files != null) {
                 for (File f : files) {
-                    URL url = null;
+                    URL url;
                     try {
                         url = f.toURI().toURL();
                     } catch (MalformedURLException e) {

+ 2 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -99,11 +99,11 @@ public class ESSyncConfigLoader {
         InputStream in = null;
         try {
             // 先取本地文件,再取类路径
-            File configFile = new File("config/" + config);
+            File configFile = new File("../config/" + config);
             if (configFile.exists()) {
                 in = new FileInputStream(configFile);
             } else {
-                in = ESSyncConfigLoader.class.getClassLoader().getResourceAsStream(config);
+                    in = ESSyncConfigLoader.class.getClassLoader().getResourceAsStream(config);
             }
             if (in == null) {
                 throw new RuntimeException("Config file: " + config + " not found.");

+ 4 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -9,6 +9,7 @@ import javax.sql.DataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.fastjson.JSON;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
@@ -38,9 +39,6 @@ public class ESSyncService {
     }
 
     public void sync(Dml dml) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("DML: {}", dml.toString());
-        }
         long begin = System.currentTimeMillis();
         String database = dml.getDatabase();
         String table = dml.getTable();
@@ -74,6 +72,9 @@ public class ESSyncService {
                     esSyncConfigs.size(),
                     dml.getDestination());
             }
+            if (logger.isDebugEnabled()) {
+                logger.debug("DML: {}", JSON.toJSONString(dml));
+            }
         }
     }
 

+ 19 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java

@@ -76,6 +76,7 @@ public class MappingConfig {
     public static class ColumnItem {
 
         private boolean isRowKey = false;
+        private Integer rowKeyLen;
         private String  column;
         private String  family;
         private String  qualifier;
@@ -89,6 +90,14 @@ public class MappingConfig {
             isRowKey = rowKey;
         }
 
+        public Integer getRowKeyLen() {
+            return rowKeyLen;
+        }
+
+        public void setRowKeyLen(Integer rowKeyLen) {
+            this.rowKeyLen = rowKeyLen;
+        }
+
         public String getColumn() {
             return column;
         }
@@ -264,7 +273,16 @@ public class MappingConfig {
                     ColumnItem columnItem = new ColumnItem();
                     columnItem.setColumn(columnField.getKey());
                     columnItem.setType(type);
-                    if ("rowKey".equalsIgnoreCase(field)) {
+                    if (field != null && field.toUpperCase().startsWith("ROWKEY")) {
+                        int idx = field.toUpperCase().indexOf("LEN:");
+                        if (idx > -1) {
+                            String len = field.substring(idx + 4);
+                            try {
+                                columnItem.setRowKeyLen(Integer.parseInt(len));
+                            } catch (Exception e) {
+                                // ignore
+                            }
+                        }
                         columnItem.setRowKey(true);
                         rowKeyColumn = columnItem;
                     } else {

+ 1 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -124,7 +124,7 @@ public class MappingConfigLoader {
         InputStream in = null;
         try {
             // 先取本地文件,再取类路径
-            File configFile = new File("config/" + config);
+            File configFile = new File("../config/" + config);
             if (configFile.exists()) {
                 in = new FileInputStream(configFile);
             } else {

+ 55 - 12
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -199,8 +199,8 @@ public class HbaseEtlService {
                     } else {
                         sqlFinal = sql + " LIMIT " + offset + "," + cnt;
                     }
-                    Future<Boolean> future = executor
-                        .submit(() -> executeSqlImport(ds, sqlFinal, hbaseMapping, hbaseTemplate, successCount, errMsg));
+                    Future<Boolean> future = executor.submit(
+                        () -> executeSqlImport(ds, sqlFinal, hbaseMapping, hbaseTemplate, successCount, errMsg));
                     futures.add(future);
                 }
 
@@ -213,8 +213,8 @@ public class HbaseEtlService {
                 executeSqlImport(ds, sql, hbaseMapping, hbaseTemplate, successCount, errMsg);
             }
 
-            logger.info(
-                hbaseMapping.getHbaseTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
+            logger.info(hbaseMapping.getHbaseTable() + " etl completed in: "
+                        + (System.currentTimeMillis() - start) / 1000 + "s!");
 
             etlResult.setResultMessage("导入HBase表 " + hbaseMapping.getHbaseTable() + " 数据:" + successCount.get() + " 条");
         } catch (Exception e) {
@@ -332,18 +332,49 @@ public class HbaseEtlService {
 
                                     byte[] valBytes = Bytes.toBytes(val.toString());
                                     if (columnItem.isRowKey()) {
-                                        row.setRowKey(valBytes);
+                                        if (columnItem.getRowKeyLen() != null) {
+                                            valBytes = Bytes.toBytes(limitLenNum(columnItem.getRowKeyLen(), val));
+                                            row.setRowKey(valBytes);
+                                        } else {
+                                            row.setRowKey(valBytes);
+                                        }
                                     } else {
                                         row.addCell(columnItem.getFamily(), columnItem.getQualifier(), valBytes);
                                     }
                                 } else {
-                                    PhType phType = PhType.getType(columnItem.getType());
-                                    if (columnItem.isRowKey()) {
-                                        row.setRowKey(PhTypeUtil.toBytes(val, phType));
-                                    } else {
-                                        row.addCell(columnItem.getFamily(),
-                                            columnItem.getQualifier(),
-                                            PhTypeUtil.toBytes(val, phType));
+                                    if (MappingConfig.Mode.STRING == hbaseMapping.getMode()) {
+                                        byte[] valBytes = Bytes.toBytes(val.toString());
+                                        if (columnItem.isRowKey()) {
+                                            if (columnItem.getRowKeyLen() != null) {
+                                                valBytes = Bytes.toBytes(limitLenNum(columnItem.getRowKeyLen(), val));
+                                            }
+                                            row.setRowKey(valBytes);
+                                        } else {
+                                            row.addCell(columnItem.getFamily(), columnItem.getQualifier(), valBytes);
+                                        }
+                                    } else if (MappingConfig.Mode.NATIVE == hbaseMapping.getMode()) {
+                                        Type type = Type.getType(columnItem.getType());
+                                        if (columnItem.isRowKey()) {
+                                            if (columnItem.getRowKeyLen() != null) {
+                                                String v = limitLenNum(columnItem.getRowKeyLen(), val);
+                                                row.setRowKey(Bytes.toBytes(v));
+                                            } else {
+                                                row.setRowKey(TypeUtil.toBytes(val, type));
+                                            }
+                                        } else {
+                                            row.addCell(columnItem.getFamily(),
+                                                columnItem.getQualifier(),
+                                                TypeUtil.toBytes(val, type));
+                                        }
+                                    } else if (MappingConfig.Mode.PHOENIX == hbaseMapping.getMode()) {
+                                        PhType phType = PhType.getType(columnItem.getType());
+                                        if (columnItem.isRowKey()) {
+                                            row.setRowKey(PhTypeUtil.toBytes(val, phType));
+                                        } else {
+                                            row.addCell(columnItem.getFamily(),
+                                                columnItem.getQualifier(),
+                                                PhTypeUtil.toBytes(val, phType));
+                                        }
                                     }
                                 }
                             }
@@ -382,4 +413,16 @@ public class HbaseEtlService {
             return false;
         }
     }
+
+    private static String limitLenNum(int len, Object val) {
+        if (val == null) {
+            return null;
+        }
+        if (val instanceof Number) {
+            return String.format("%0" + len + "d", (Number) ((Number) val).longValue());
+        } else if (val instanceof String) {
+            return String.format("%0" + len + "d", Long.parseLong((String) val));
+        }
+        return null;
+    }
 }

+ 39 - 7
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -6,6 +6,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.fastjson.JSON;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.hbase.support.*;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
@@ -38,7 +39,7 @@ public class HbaseSyncService {
                     delete(config, dml);
                 }
                 if (logger.isDebugEnabled()) {
-                    logger.debug("DML: {}", dml.toString());
+                    logger.debug("DML: {}", JSON.toJSONString(dml));
                 }
             }
         } catch (Exception e) {
@@ -131,7 +132,21 @@ public class HbaseSyncService {
                     }
                 } else {
                     if (columnItem.isRowKey()) {
-                        // row.put("rowKey", bytes);
+                        if (columnItem.getRowKeyLen() != null && entry.getValue() != null) {
+                            if (entry.getValue() instanceof Number) {
+                                String v = String.format("%0" + columnItem.getRowKeyLen() + "d",
+                                    ((Number) entry.getValue()).longValue());
+                                bytes = Bytes.toBytes(v);
+                            } else {
+                                try {
+                                    String v = String.format("%0" + columnItem.getRowKeyLen() + "d",
+                                        Integer.parseInt((String) entry.getValue()));
+                                    bytes = Bytes.toBytes(v);
+                                } catch (Exception e) {
+                                    // ignore
+                                }
+                            }
+                        }
                         hRow.setRowKey(bytes);
                     } else {
                         hRow.addCell(columnItem.getFamily(), columnItem.getQualifier(), bytes);
@@ -145,8 +160,8 @@ public class HbaseSyncService {
     /**
      * 更新操作
      * 
-     * @param config
-     * @param dml
+     * @param config 配置对象
+     * @param dml dml对象
      */
     private void update(MappingConfig config, Dml dml) {
         List<Map<String, Object>> data = dml.getData();
@@ -191,7 +206,7 @@ public class HbaseSyncService {
                 Map<String, Object> rowKey = data.get(0);
                 rowKeyBytes = typeConvert(null, hbaseMapping, rowKey.values().iterator().next());
             } else {
-                rowKeyBytes = typeConvert(rowKeyColumn, hbaseMapping, r.get(rowKeyColumn.getColumn()));
+                rowKeyBytes = getRowKeyBytes(hbaseMapping, rowKeyColumn, r);
             }
             if (rowKeyBytes == null) throw new RuntimeException("rowKey值为空");
 
@@ -276,8 +291,7 @@ public class HbaseSyncService {
                 Map<String, Object> rowKey = data.get(0);
                 rowKeyBytes = typeConvert(null, hbaseMapping, rowKey.values().iterator().next());
             } else {
-                Object val = r.get(rowKeyColumn.getColumn());
-                rowKeyBytes = typeConvert(rowKeyColumn, hbaseMapping, val);
+                rowKeyBytes = getRowKeyBytes(hbaseMapping, rowKeyColumn, r);
             }
             if (rowKeyBytes == null) throw new RuntimeException("rowKey值为空");
             rowKeys.add(rowKeyBytes);
@@ -423,4 +437,22 @@ public class HbaseSyncService {
         return rowKeyValue.toString();
     }
 
+    private static byte[] getRowKeyBytes(MappingConfig.HbaseMapping hbaseMapping, MappingConfig.ColumnItem rowKeyColumn,
+                                         Map<String, Object> rowData) {
+        Object val = rowData.get(rowKeyColumn.getColumn());
+        String v = null;
+        if (rowKeyColumn.getRowKeyLen() != null) {
+            if (val instanceof Number) {
+                v = String.format("%0" + rowKeyColumn.getRowKeyLen() + "d", (Number) ((Number) val).longValue());
+            } else if (val instanceof String) {
+                v = String.format("%0" + rowKeyColumn.getRowKeyLen() + "d", Long.parseLong((String) val));
+            }
+        }
+        if (v != null) {
+            return Bytes.toBytes(v);
+        } else {
+            return typeConvert(rowKeyColumn, hbaseMapping, val);
+        }
+    }
+
 }

+ 7 - 7
client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml

@@ -1,7 +1,7 @@
 dataSourceKey: defaultDS
 destination: example
 hbaseMapping:
-  mode: PHOENIX  #NATIVE   #STRING
+  mode: STRING  #NATIVE   #PHOENIX
   database: mytest  # 数据库名
   table: person2     # 数据库表名
   hbaseTable: MYTEST.PERSON2   # HBase表名
@@ -11,14 +11,14 @@ hbaseMapping:
   #rowKey: id,type  # 复合字段rowKey不能和columns中的rowKey重复
   columns:
     # 数据库字段:HBase对应字段
-    id: ROWKEY$UNSIGNED_LONG
+    id: ROWKEY LEN:15
     name: NAME
     email: EMAIL
-    type: $DECIMAL
-    c_time: C_TIME$UNSIGNED_TIMESTAMP
-    birthday: BIRTHDAY$DATE
-  excludeColumns:
-    - lat   # 忽略字段
+    type:
+    c_time: C_TIME
+    birthday: BIRTHDAY
+#  excludeColumns:
+#    - lat   # 忽略字段
 
 # -- NATIVE类型
 # $DEFAULT

+ 69 - 81
client-adapter/launcher/pom.xml

@@ -47,7 +47,7 @@
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-configuration-processor</artifactId>
-            <optional>true</optional>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>
@@ -79,7 +79,7 @@
                 </exclusion>
             </exclusions>
             <classifier>jar-with-dependencies</classifier>
-            <optional>true</optional>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>com.alibaba.otter</groupId>
@@ -92,7 +92,7 @@
                 </exclusion>
             </exclusions>
             <classifier>jar-with-dependencies</classifier>
-            <optional>true</optional>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>com.alibaba.otter</groupId>
@@ -105,94 +105,100 @@
                 </exclusion>
             </exclusions>
             <classifier>jar-with-dependencies</classifier>
-            <optional>true</optional>
+            <scope>provided</scope>
         </dependency>
     </dependencies>
 
     <build>
-        <finalName>canal-adapter-launcher</finalName>
         <plugins>
             <plugin>
-                <groupId>org.springframework.boot</groupId>
-                <artifactId>spring-boot-maven-plugin</artifactId>
-                <version>2.0.1.RELEASE</version>
-                <configuration>
-                    <excludes>
-                        <exclude>
-                            <groupId>com.alibaba.otter</groupId>
-                            <artifactId>client-adapter.logger</artifactId>
-                        </exclude>
-                        <exclude>
-                            <groupId>com.alibaba.otter</groupId>
-                            <artifactId>client-adapter.hbase</artifactId>
-                        </exclude>
-                        <exclude>
-                            <groupId>com.alibaba.otter</groupId>
-                            <artifactId>client-adapter.elasticsearch</artifactId>
-                        </exclude>
-                    </excludes>
-                </configuration>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>repackage</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <artifactId>maven-antrun-plugin</artifactId>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.10</version>
                 <executions>
                     <execution>
+                        <id>copy-dependencies-to-canal-client-service</id>
                         <phase>package</phase>
                         <goals>
-                            <goal>run</goal>
+                            <goal>copy-dependencies</goal>
                         </goals>
                         <configuration>
-                            <tasks>
-                                <copy todir="${project.basedir}/target/config" overwrite="true">
-                                    <fileset dir="${project.basedir}/src/main/resources" erroronmissingdir="true">
-                                        <include name="*.yml"/>
-                                    </fileset>
-                                </copy>
-                                <copy todir="${project.basedir}/target/config/hbase" overwrite="true">
-                                    <fileset dir="${project.basedir}/../hbase/src/main/resources/hbase"
-                                             erroronmissingdir="true">
-                                        <include name="*.yml"/>
-                                    </fileset>
-                                </copy>
-                                <copy todir="${project.basedir}/target/config/es" overwrite="true">
-                                    <fileset dir="${project.basedir}/../elasticsearch/src/main/resources/es" erroronmissingdir="true">
-                                        <include name="*.yml" />
-                                    </fileset>
-                                </copy>
-                            </tasks>
+                            <includeClassifiers>jar-with-dependencies</includeClassifiers>
+                            <outputDirectory>${project.basedir}/target/canal-adapter/lib</outputDirectory>
                         </configuration>
                     </execution>
                 </executions>
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <version>2.10</version>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.2.1</version>
                 <executions>
                     <execution>
-                        <id>copy-dependencies-to-canal-client-service</id>
-                        <phase>package</phase>
+                        <id>assemble</id>
                         <goals>
-                            <goal>copy-dependencies</goal>
+                            <goal>single</goal>
                         </goals>
-                        <configuration>
-                            <includeClassifiers>jar-with-dependencies</includeClassifiers>
-                            <outputDirectory>${project.basedir}/target/lib</outputDirectory>
-                        </configuration>
+                        <phase>package</phase>
                     </execution>
                 </executions>
+                <configuration>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <attach>false</attach>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            <classpathPrefix>lib/</classpathPrefix>
+                            <mainClass>com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication</mainClass>
+                        </manifest>
+                    </archive>
+                    <excludes>
+                        <exclude>**/*.properties</exclude>
+                        <exclude>**/*.xml</exclude>
+                        <exclude>**/*.yml</exclude>
+                        <exclude>static/**</exclude>
+                        <exclude>templates/**</exclude>
+                    </excludes>
+                </configuration>
             </plugin>
         </plugins>
     </build>
 
     <profiles>
+        <profile>
+            <id>dev</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+                <property>
+                    <name>env</name>
+                    <value>!release</value>
+                </property>
+            </activation>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <descriptors>
+                                <descriptor>${basedir}/src/main/assembly/dev.xml</descriptor>
+                            </descriptors>
+                            <finalName>canal-adapter</finalName>
+                            <outputDirectory>${project.build.directory}</outputDirectory>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+
+        </profile>
+
         <profile>
             <id>release</id>
             <activation>
@@ -207,32 +213,14 @@
                     <plugin>
                         <artifactId>maven-assembly-plugin</artifactId>
                         <configuration>
-                            <appendAssemblyId>false</appendAssemblyId>
                             <descriptors>
                                 <descriptor>${basedir}/src/main/assembly/release.xml</descriptor>
                             </descriptors>
-                            <finalName>${project.artifactId}-${project.version}</finalName>
+                            <finalName>canal.adapter-${project.version}</finalName>
                             <outputDirectory>${project.basedir}/../../target</outputDirectory>
                         </configuration>
-                        <executions>
-                            <execution>
-                                <id>make-assembly</id>
-                                <phase>package</phase>
-                                <goals>
-                                    <goal>single</goal>
-                                </goals>
-                            </execution>
-                        </executions>
                     </plugin>
                 </plugins>
-                <resources>
-                    <resource>
-                        <directory>src/main/resources</directory>
-                        <excludes>
-                            <exclude>application.yml</exclude>
-                        </excludes>
-                    </resource>
-                </resources>
             </build>
         </profile>
     </profiles>

+ 62 - 0
client-adapter/launcher/src/main/assembly/dev.xml

@@ -0,0 +1,62 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>dist</id>
+	<formats>
+		<format>dir</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>.</directory>
+			<outputDirectory>/</outputDirectory>
+			<includes>
+				<include>README*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/resources</directory>
+			<outputDirectory>/config</outputDirectory>
+			<includes>
+				<include>**/*</include>
+
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>../elasticsearch/src/main/resources/es</directory>
+			<outputDirectory>/config/es</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+        <fileSet>
+            <directory>../hbase/src/main/resources/hbase</directory>
+            <outputDirectory>/config/hbase</outputDirectory>
+            <includes>
+                <include>**/*</include>
+            </includes>
+        </fileSet>
+		<fileSet>
+			<directory>target</directory>
+			<outputDirectory>logs</outputDirectory>
+			<excludes>
+				<exclude>**/*</exclude>
+			</excludes>
+		</fileSet>
+	</fileSets>
+	<dependencySets>
+		<dependencySet>
+			<outputDirectory>lib</outputDirectory>
+			<excludes>
+				<exclude>junit:junit</exclude>
+			</excludes>
+		</dependencySet>
+	</dependencySets>
+</assembly>

+ 47 - 10
client-adapter/launcher/src/main/assembly/release.xml

@@ -1,34 +1,71 @@
 <assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
-    <id>canal-adapter</id>
+    <id>dist</id>
     <formats>
         <format>tar.gz</format>
     </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
     <fileSets>
         <fileSet>
-            <directory>${project.build.directory}</directory>
+            <directory>.</directory>
             <outputDirectory>/</outputDirectory>
             <includes>
-                <include>*.jar</include>
+                <include>README*</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>./src/main/bin</directory>
+            <outputDirectory>bin</outputDirectory>
+            <includes>
+                <include>**/*</include>
+            </includes>
+            <fileMode>0755</fileMode>
+        </fileSet>
+        <fileSet>
+            <directory>./src/main/resources</directory>
+            <outputDirectory>/config</outputDirectory>
+            <includes>
+                <include>**/*</include>
 
+            </includes>
+        </fileSet>
         <fileSet>
-            <directory>${project.build.directory}/lib</directory>
-            <outputDirectory>/lib/</outputDirectory>
+            <directory>../elasticsearch/src/main/resources/es</directory>
+            <outputDirectory>/config/es</outputDirectory>
+            <includes>
+                <include>**/*</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>../hbase/src/main/resources/hbase</directory>
+            <outputDirectory>/config/hbase</outputDirectory>
             <includes>
-                <include>*.jar</include>
+                <include>**/*</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>target</directory>
+            <outputDirectory>logs</outputDirectory>
+            <excludes>
+                <exclude>**/*</exclude>
+            </excludes>
+        </fileSet>
 
         <fileSet>
-            <directory>${project.build.directory}/config</directory>
-            <outputDirectory>/config/</outputDirectory>
+            <directory>${project.basedir}/target/canal-adapter/lib</directory>
+            <outputDirectory>/lib/</outputDirectory>
             <includes>
-                <include>**</include>
+                <include>*-jar-with-dependencies.jar</include>
             </includes>
         </fileSet>
     </fileSets>
-    <baseDirectory>/</baseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>lib</outputDirectory>
+            <excludes>
+                <exclude>junit:junit</exclude>
+            </excludes>
+        </dependencySet>
+    </dependencySets>
 </assembly>

+ 5 - 0
client-adapter/launcher/src/main/bin/restart.sh

@@ -0,0 +1,5 @@
+#!/bin/bash
+
+sh stop.sh
+
+sh startup.sh

+ 22 - 0
client-adapter/launcher/src/main/bin/startup.bat

@@ -0,0 +1,22 @@
+@echo off
+@if not "%ECHO%" == ""  echo %ECHO%
+@if "%OS%" == "Windows_NT"  setlocal
+
+set ENV_PATH=.\
+if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%
+
+set conf_dir=%ENV_PATH%\..\config
+
+set CLASSPATH=%conf_dir%
+set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%
+
+set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m
+set JAVA_OPTS_EXT= -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8
+set ADAPTER_OPTS= -DappName=canal-adapter
+
+set JAVA_OPTS= %JAVA_MEM_OPTS% %JAVA_OPTS_EXT% %ADAPTER_OPTS%
+
+set CMD_STR= java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication
+echo start cmd : %CMD_STR%
+
+java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication

+ 68 - 0
client-adapter/launcher/src/main/bin/startup.sh

@@ -0,0 +1,68 @@
+#!/bin/bash
+
+current_path=`pwd`
+case "`uname`" in
+    Linux)
+		bin_abs_path=$(readlink -f $(dirname $0))
+		;;
+	*)
+		bin_abs_path=`cd $(dirname $0); pwd`
+		;;
+esac
+base=${bin_abs_path}/..
+#canal_conf=$base/config/canal.properties
+#logback_configurationFile=$base/conf/logback.xml
+export LANG=en_US.UTF-8
+export BASE=$base
+
+if [ -f $base/bin/adapter.pid ] ; then
+	echo "found adapter.pid , Please run stop.sh first ,then startup.sh" 2>&2
+    exit 1
+fi
+
+if [ ! -d $base/logs ] ; then
+	mkdir -p $base/logs
+fi
+
+## set java path
+if [ -z "$JAVA" ] ; then
+  JAVA=$(which java)
+fi
+
+ALIBABA_JAVA="/usr/alibaba/java/bin/java"
+TAOBAO_JAVA="/opt/taobao/java/bin/java"
+if [ -z "$JAVA" ]; then
+  if [ -f $ALIBABA_JAVA ] ; then
+  	JAVA=$ALIBABA_JAVA
+  elif [ -f $TAOBAO_JAVA ] ; then
+  	JAVA=$TAOBAO_JAVA
+  else
+  	echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2
+    exit 1
+  fi
+fi
+
+str=`file -L $JAVA | grep 64-bit`
+if [ -n "$str" ]; then
+	JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
+else
+	JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
+fi
+
+JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
+ADAPTER_OPTS="-DappName=canal-adapter"
+
+for i in $base/lib/*;
+    do CLASSPATH=$i:"$CLASSPATH";
+done
+CLASSPATH="$base/config:$CLASSPATH";
+
+echo "cd to $bin_abs_path for workaround relative path"
+cd $bin_abs_path
+
+echo CLASSPATH :$CLASSPATH
+$JAVA $JAVA_OPTS $ADAPTER_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication 1>>$base/logs/adapter.log 2>&1 &
+echo $! > $base/bin/adapter.pid
+
+echo "cd to $current_path for continue"
+cd $current_path

+ 65 - 0
client-adapter/launcher/src/main/bin/stop.sh

@@ -0,0 +1,65 @@
+#!/bin/bash
+
+cygwin=false;
+linux=false;
+case "`uname`" in
+    CYGWIN*)
+        cygwin=true
+        ;;
+    Linux*)
+    	linux=true
+    	;;
+esac
+
+get_pid() {	
+	STR=$1
+	PID=$2
+    if $cygwin; then
+        JAVA_CMD="$JAVA_HOME\bin\java"
+        JAVA_CMD=`cygpath --path --unix $JAVA_CMD`
+        JAVA_PID=`ps |grep $JAVA_CMD |awk '{print $1}'`
+    else
+    	if $linux; then
+	        if [ ! -z "$PID" ]; then
+	        	JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
+		    else 
+		        JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep -v grep|awk '{print $2}'`
+	        fi
+	    else
+	    	if [ ! -z "$PID" ]; then
+	        	JAVA_PID=`ps aux |grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
+		    else 
+		        JAVA_PID=`ps aux |grep "$STR"|grep -v grep|awk '{print $2}'`
+	        fi
+	    fi
+    fi
+    echo $JAVA_PID;
+}
+
+base=`dirname $0`/..
+pidfile=$base/bin/adapter.pid
+if [ ! -f "$pidfile" ];then
+	echo "canal-adapter is not running. exists"
+	exit
+fi
+
+pid=`cat $pidfile`
+if [ "$pid" == "" ] ; then
+	pid=`get_pid "appName=canal-adapter"`
+fi
+
+echo -e "`hostname`: stopping canal $pid ... "
+kill $pid
+
+LOOPS=0
+while (true); 
+do 
+	gpid=`get_pid "appName=canal-adapter" "$pid"`
+    if [ "$gpid" == "" ] ; then
+    	echo "Oook! cost:$LOOPS"
+    	`rm $pidfile`
+    	break;
+    fi
+    let LOOPS=LOOPS+1
+    sleep 1
+done

+ 20 - 8
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -56,8 +56,11 @@ public class CommonRest {
     @PostMapping("/etl/{type}/{task}")
     public EtlResult etl(@PathVariable String type, @PathVariable String task,
                          @RequestParam(name = "params", required = false) String params) {
+        OuterAdapter adapter = loader.getExtension(type);
+        String destination = adapter.getDestination(task);
+        String lockKey = destination == null ? task : destination;
 
-        boolean locked = etlLock.tryLock(ETL_LOCK_ZK_NODE + type + "-" + task);
+        boolean locked = etlLock.tryLock(ETL_LOCK_ZK_NODE + type + "-" + lockKey);
         if (!locked) {
             EtlResult result = new EtlResult();
             result.setSucceeded(false);
@@ -65,12 +68,19 @@ public class CommonRest {
             return result;
         }
         try {
-            OuterAdapter adapter = loader.getExtension(type);
-            String destination = adapter.getDestination(task);
-            Boolean oriSwithcStatus = null;
+
+            Boolean oriSwitchStatus;
             if (destination != null) {
-                oriSwithcStatus = syncSwitch.status(destination);
-                syncSwitch.off(destination);
+                oriSwitchStatus = syncSwitch.status(destination);
+                if (oriSwitchStatus != null && oriSwitchStatus) {
+                    syncSwitch.off(destination);
+                }
+            } else {
+                // task可能为destination,直接锁task
+                oriSwitchStatus = syncSwitch.status(task);
+                if (oriSwitchStatus != null && oriSwitchStatus) {
+                    syncSwitch.off(task);
+                }
             }
             try {
                 List<String> paramArr = null;
@@ -80,12 +90,14 @@ public class CommonRest {
                 }
                 return adapter.etl(task, paramArr);
             } finally {
-                if (destination != null && oriSwithcStatus != null && oriSwithcStatus) {
+                if (destination != null && oriSwitchStatus != null && oriSwitchStatus) {
                     syncSwitch.on(destination);
+                } else if (destination == null && oriSwitchStatus != null && oriSwitchStatus) {
+                    syncSwitch.on(task);
                 }
             }
         } finally {
-            etlLock.unlock(ETL_LOCK_ZK_NODE + type + "-" + task);
+            etlLock.unlock(ETL_LOCK_ZK_NODE + type + "-" + lockKey);
         }
     }
 

+ 1 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -3,7 +3,7 @@ server:
 logging:
   level:
     com.alibaba.otter.canal.client.adapter.hbase: DEBUG
-    com.alibaba.otter.canal.client.adapter.es: TRACE
+    com.alibaba.otter.canal.client.adapter.es: DEBUG
 spring:
   jackson:
     date-format: yyyy-MM-dd HH:mm:ss

+ 1 - 0
client/pom.xml

@@ -149,6 +149,7 @@
 							<exclude>commons-logging:commons-logging</exclude>
 							<exclude>aopalliance:aopalliance</exclude>
 							<exclude>com.google.protobuf:protobuf-java</exclude>
+							<exclude>com.google.code.findbugs:jsr305</exclude>
 							<exclude>io.netty:*</exclude>
 							<exclude>junit:junit</exclude>
 						</excludes>

+ 2 - 2
example/src/main/java/com/alibaba/otter/canal/example/db/CanalConnectorClient.java

@@ -359,7 +359,7 @@ public abstract class CanalConnectorClient extends AbstractCanalLifeCycle implem
                 logger.warn("parse delete index event, table: {}, sql: {}", table, sql);
                 return;
             default:
-                logger.warn("parse unknown event: {}, table: {}, sql: {}", new String[]{eventType.name(), table, sql});
+                logger.warn("parse unknown event: {}, table: {}, sql: {}", eventType.name(), table, sql);
                 break;
         }
     }
@@ -375,7 +375,7 @@ public abstract class CanalConnectorClient extends AbstractCanalLifeCycle implem
     public void whenOthers(CanalEntry.Header header, String sql) {
         String schema = header.getSchemaName();
         String table = header.getTableName();
-        logger.error("ignore event,schema: {},table: {},SQL: {}", new String[]{schema, table, sql});
+        logger.error("ignore event,schema: {},table: {},SQL: {}", schema, table, sql);
     }
 
     public enum ExceptionStrategy {