Browse Source

adapter代码整理优化

mcy 6 years ago
parent
commit
6b3bf31404
35 changed files with 771 additions and 765 deletions
  1. 1 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/config/YmlConfigBinder.java
  2. 1 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/config/bind/RelaxedConversionService.java
  3. 1 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/config/common/YamlProcessor.java
  4. 29 23
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java
  5. 2 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java
  6. 4 4
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java
  7. 15 53
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java
  8. 1 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java
  9. 0 2
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java
  10. 3 2
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java
  11. 1 2
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/monitor/ESConfigMonitor.java
  12. 25 6
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java
  13. 6 5
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  14. 7 49
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java
  15. 2 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java
  16. 1 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/TestConstant.java
  17. 18 8
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java
  18. 3 3
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java
  19. 1 3
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/PhTypeUtil.java
  20. 1 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/EtlLock.java
  21. 4 4
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java
  22. 16 5
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  23. 17 7
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  24. 15 23
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  25. 21 6
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  26. 1 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
  27. 4 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationConfigMonitor.java
  28. 8 3
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/DbRemoteConfigLoader.java
  29. 9 4
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/RemoteAdapterMonitorImpl.java
  30. 17 17
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java
  31. 11 11
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java
  32. 15 4
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java
  33. 507 507
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  34. 3 4
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java
  35. 1 1
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/TestConstant.java

+ 1 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/config/YmlConfigBinder.java

@@ -84,7 +84,7 @@ public class YmlConfigBinder {
         try {
             byte[] contentBytes;
             if (charset == null) {
-                contentBytes = content.getBytes();
+                contentBytes = content.getBytes("UTF-8");
             } else {
                 contentBytes = content.getBytes(charset);
             }

+ 1 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/config/bind/RelaxedConversionService.java

@@ -89,7 +89,7 @@ class RelaxedConversionService implements ConversionService {
             return new RelaxedConversionService.StringToEnumIgnoringCaseConverterFactory.StringToEnum(enumType);
         }
 
-        private class StringToEnum<T extends Enum> implements Converter<String, T> {
+        private static class StringToEnum<T extends Enum> implements Converter<String, T> {
 
             private final Class<T> enumType;
 

+ 1 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/config/common/YamlProcessor.java

@@ -191,6 +191,7 @@ public abstract class YamlProcessor {
     }
 
     private boolean process(Map<String, Object> map, MatchCallback callback) {
+        @SuppressWarnings("serial")
         Properties properties = new Properties() {
 
             @Override

+ 29 - 23
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java

@@ -6,6 +6,7 @@ import java.io.InputStreamReader;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Enumeration;
@@ -27,7 +28,8 @@ import org.slf4j.LoggerFactory;
  */
 public class ExtensionLoader<T> {
 
-    private static final Logger                                      logger                     = LoggerFactory.getLogger(ExtensionLoader.class);
+    private static final Logger                                      logger                     = LoggerFactory
+        .getLogger(ExtensionLoader.class);
 
     private static final String                                      SERVICES_DIRECTORY         = "META-INF/services/";
 
@@ -35,7 +37,8 @@ public class ExtensionLoader<T> {
 
     private static final String                                      DEFAULT_CLASSLOADER_POLICY = "internal";
 
-    private static final Pattern                                     NAME_SEPARATOR             = Pattern.compile("\\s*[,]+\\s*");
+    private static final Pattern                                     NAME_SEPARATOR             = Pattern
+        .compile("\\s*[,]+\\s*");
 
     private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS          = new ConcurrentHashMap<>();
 
@@ -171,7 +174,8 @@ public class ExtensionLoader<T> {
             return instance;
         } catch (Throwable t) {
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
-                                            + ")  could not be instantiated: " + t.getMessage(), t);
+                                            + ")  could not be instantiated: " + t.getMessage(),
+                t);
         }
     }
 
@@ -191,7 +195,8 @@ public class ExtensionLoader<T> {
             return instance;
         } catch (Throwable t) {
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
-                                            + ")  could not be instantiated: " + t.getMessage(), t);
+                                            + ")  could not be instantiated: " + t.getMessage(),
+                t);
         }
     }
 
@@ -233,7 +238,11 @@ public class ExtensionLoader<T> {
                                          // button.
             jarPath = jarPath.replaceAll("/classes/.*", "/classes/");
         }
-        return Paths.get(jarPath).getParent().toString(); // Paths - from java 8
+        Path path = Paths.get(jarPath).getParent(); // Paths - from java 8
+        if (path != null) {
+            return path.toString();
+        }
+        return null;
     }
 
     private Map<String, Class<?>> loadExtensionClasses() {
@@ -330,12 +339,10 @@ public class ExtensionLoader<T> {
                                             // Class.forName(line, true,
                                             // classLoader);
                                             if (!type.isAssignableFrom(clazz)) {
-                                                throw new IllegalStateException("Error when load extension class(interface: "
-                                                                                + type
-                                                                                + ", class line: "
-                                                                                + clazz.getName()
-                                                                                + "), class "
-                                                                                + clazz.getName()
+                                                throw new IllegalStateException(
+                                                    "Error when load extension class(interface: " + type
+                                                                                + ", class line: " + clazz.getName()
+                                                                                + "), class " + clazz.getName()
                                                                                 + "is not subtype of interface.");
                                             } else {
                                                 try {
@@ -353,9 +360,9 @@ public class ExtensionLoader<T> {
                                                                 extensionClasses.put(n, clazz);
                                                             } else if (c != clazz) {
                                                                 cachedNames.remove(clazz);
-                                                                throw new IllegalStateException("Duplicate extension "
-                                                                                                + type.getName()
-                                                                                                + " name " + n + " on "
+                                                                throw new IllegalStateException(
+                                                                    "Duplicate extension " + type.getName() + " name "
+                                                                                                + n + " on "
                                                                                                 + c.getName() + " and "
                                                                                                 + clazz.getName());
                                                             }
@@ -365,12 +372,9 @@ public class ExtensionLoader<T> {
                                             }
                                         }
                                     } catch (Throwable t) {
-                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: "
-                                                                                            + type
-                                                                                            + ", class line: "
-                                                                                            + line
-                                                                                            + ") in "
-                                                                                            + url
+                                        IllegalStateException e = new IllegalStateException(
+                                            "Failed to load extension class(interface: " + type + ", class line: "
+                                                                                            + line + ") in " + url
                                                                                             + ", cause: "
                                                                                             + t.getMessage(),
                                             t);
@@ -385,13 +389,15 @@ public class ExtensionLoader<T> {
                         }
                     } catch (Throwable t) {
                         logger.error("Exception when load extension class(interface: " + type + ", class file: " + url
-                                     + ") in " + url, t);
+                                     + ") in " + url,
+                            t);
                     }
                 } // end of while urls
             }
         } catch (Throwable t) {
-            logger.error("Exception when load extension class(interface: " + type + ", description file: " + fileName
-                         + ").", t);
+            logger.error(
+                "Exception when load extension class(interface: " + type + ", description file: " + fileName + ").",
+                t);
         }
     }
 

+ 2 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java

@@ -155,9 +155,11 @@ public class JdbcTypeUtil {
                     } else {
                         res = null;
                     }
+                    break;
                 case Types.CLOB:
                 default:
                     res = value;
+                    break;
             }
             return res;
         } catch (Exception e) {

+ 4 - 4
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java

@@ -12,10 +12,10 @@ import java.util.Date;
 public class Result implements Serializable {
 
     private static final long serialVersionUID = -3276409502352405716L;
-    public Integer code = 20000;
-    public Object  data;
-    public String  message;
-    public Date    sysTime;
+    private Integer           code             = 20000;
+    private Object            data;
+    private String            message;
+    private Date              sysTime;
 
     public static Result createSuccess(String message) {
         Result result = new Result();

+ 15 - 53
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java

@@ -2,7 +2,10 @@ package com.alibaba.otter.canal.client.adapter.support;
 
 import java.io.File;
 import java.net.URL;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.time.*;
 import java.time.format.DateTimeFormatter;
 import java.util.Date;
@@ -29,38 +32,16 @@ public class Util {
     /**
      * 通过DS执行sql
      */
-    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) throws SQLException {
-        Connection conn = null;
-        Statement stmt = null;
-        ResultSet rs = null;
-        try {
-            conn = ds.getConnection();
-            stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) {
+        try (Connection conn = ds.getConnection();
+                Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);) {
             stmt.setFetchSize(Integer.MIN_VALUE);
-            rs = stmt.executeQuery(sql);
-            return fun.apply(rs);
-        } finally {
-            if (rs != null) {
-                try {
-                    rs.close();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-            if (stmt != null) {
-                try {
-                    stmt.close();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-            if (conn != null) {
-                try {
-                    conn.close();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(), e);
-                }
+            try (ResultSet rs = stmt.executeQuery(sql);) {
+                return fun.apply(rs);
             }
+        } catch (Exception e) {
+            logger.error("sqlRs has error, sql: {} ", sql);
+            throw new RuntimeException(e);
         }
     }
 
@@ -72,29 +53,10 @@ public class Util {
      * @param consumer 回调方法
      */
     public static void sqlRS(Connection conn, String sql, Consumer<ResultSet> consumer) {
-        Statement stmt = null;
-        ResultSet rs = null;
-        try {
-            stmt = conn.createStatement();
-            rs = stmt.executeQuery(sql);
+        try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
             consumer.accept(rs);
         } catch (SQLException e) {
             logger.error(e.getMessage(), e);
-        } finally {
-            if (rs != null) {
-                try {
-                    rs.close();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-            if (stmt != null) {
-                try {
-                    stmt.close();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
         }
     }
 
@@ -144,7 +106,7 @@ public class Util {
         return column;
     }
 
-    public static String timeZone; // 当前时区
+    public final static String  timeZone;    // 当前时区
     private static DateTimeZone dateTimeZone;
 
     static {
@@ -265,7 +227,7 @@ public class Util {
                 LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime);
                 return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error(e.getMessage(), e);
         }
 

+ 1 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -134,7 +134,7 @@ public class ESAdapter implements OuterAdapter {
 
             esConfigMonitor = new ESConfigMonitor();
             esConfigMonitor.init(this, envProperties);
-        } catch (Exception e) {
+        } catch (Throwable e) {
             throw new RuntimeException(e);
         }
     }

+ 0 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -5,8 +5,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang.StringUtils;
-
 /**
  * ES 映射配置
  *

+ 3 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java

@@ -278,12 +278,13 @@ public class SchemaItem {
             if (relationSelectFieldItems == null) {
                 synchronized (SchemaItem.class) {
                     if (relationSelectFieldItems == null) {
-                        relationSelectFieldItems = new ArrayList<>();
+                        List<FieldItem> relationSelectFieldItemsTmp = new ArrayList<>();
                         for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
                             if (fieldItem.getOwners().contains(getAlias())) {
-                                relationSelectFieldItems.add(fieldItem);
+                                relationSelectFieldItemsTmp.add(fieldItem);
                             }
                         }
+                        relationSelectFieldItems = relationSelectFieldItemsTmp;
                     }
                 }
             }

+ 1 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/monitor/ESConfigMonitor.java

@@ -7,16 +7,15 @@ import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
 import org.apache.commons.io.monitor.FileAlterationObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
 
 import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
 import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;

+ 25 - 6
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -1,10 +1,16 @@
 package com.alibaba.otter.canal.client.adapter.es.service;
 
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -29,10 +35,10 @@ import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
-import com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import com.google.common.base.Joiner;
 
 /**
@@ -99,7 +105,7 @@ public class ESEtlService {
 
             // 获取总数
             String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
-            long cnt = (Long) ESSyncUtil.sqlRS(dataSource, countSql, rs -> {
+            long cnt = (Long) Util.sqlRS(dataSource, countSql, rs -> {
                 Long count = null;
                 try {
                     if (rs.next()) {
@@ -115,7 +121,20 @@ public class ESEtlService {
             if (cnt >= 10000) {
                 int threadCount = 3; // 从配置读取默认为3
                 long perThreadCnt = cnt / threadCount;
-                ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+                ExecutorService executor = new ThreadPoolExecutor(threadCount,
+                    threadCount,
+                    5000L,
+                    TimeUnit.MILLISECONDS,
+                    new SynchronousQueue<>(),
+                    (r, exe) -> {
+                        if (!exe.isShutdown()) {
+                            try {
+                                exe.getQueue().put(r);
+                            } catch (InterruptedException e1) {
+                                // ignore
+                            }
+                        }
+                    });
                 List<Future<Boolean>> futures = new ArrayList<>(threadCount);
                 for (int i = 0; i < threadCount; i++) {
                     long offset = i * perThreadCnt;
@@ -175,7 +194,7 @@ public class ESEtlService {
     private boolean executeSqlImport(DataSource ds, String sql, ESMapping mapping, AtomicLong impCount,
                                      List<String> errMsg) {
         try {
-            ESSyncUtil.sqlRS(ds, sql, rs -> {
+            Util.sqlRS(ds, sql, rs -> {
                 int count = 0;
                 try {
                     BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();

+ 6 - 5
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -7,6 +7,7 @@ import java.util.Map;
 
 import javax.sql.DataSource;
 
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -464,7 +465,7 @@ public class ESSyncService {
                 mapping.get_index(),
                 sql.replace("\n", " "));
         }
-        ESSyncUtil.sqlRS(ds, sql, rs -> {
+        Util.sqlRS(ds, sql, rs -> {
             try {
                 while (rs.next()) {
                     Map<String, Object> esFieldData = new LinkedHashMap<>();
@@ -500,7 +501,7 @@ public class ESSyncService {
                 mapping.get_index(),
                 sql.replace("\n", " "));
         }
-        ESSyncUtil.sqlRS(ds, sql, rs -> {
+        Util.sqlRS(ds, sql, rs -> {
             try {
                 Map<String, Object> esFieldData = null;
                 if (mapping.getPk() != null) {
@@ -601,7 +602,7 @@ public class ESSyncService {
                 mapping.get_index(),
                 sql.toString().replace("\n", " "));
         }
-        ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
+        Util.sqlRS(ds, sql.toString(), rs -> {
             try {
                 while (rs.next()) {
                     Map<String, Object> esFieldData = new LinkedHashMap<>();
@@ -693,7 +694,7 @@ public class ESSyncService {
                 mapping.get_index(),
                 sql.toString().replace("\n", " "));
         }
-        ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
+        Util.sqlRS(ds, sql.toString(), rs -> {
             try {
                 while (rs.next()) {
                     Map<String, Object> esFieldData = new LinkedHashMap<>();
@@ -812,7 +813,7 @@ public class ESSyncService {
                 mapping.get_index(),
                 sql.replace("\n", " "));
         }
-        ESSyncUtil.sqlRS(ds, sql, rs -> {
+        Util.sqlRS(ds, sql, rs -> {
             try {
                 while (rs.next()) {
                     Map<String, Object> esFieldData = new LinkedHashMap<>();

+ 7 - 49
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java

@@ -3,12 +3,9 @@ package com.alibaba.otter.canal.client.adapter.es.support;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
-import java.sql.*;
+import java.sql.Blob;
+import java.sql.SQLException;
 import java.util.*;
-import java.util.Date;
-import java.util.function.Function;
-
-import javax.sql.DataSource;
 
 import org.apache.commons.codec.binary.Base64;
 import org.joda.time.DateTime;
@@ -234,8 +231,11 @@ public class ESSyncUtil {
     private static byte[] blobToBytes(Blob blob) {
         try (InputStream is = blob.getBinaryStream()) {
             byte[] b = new byte[(int) blob.length()];
-            is.read(b);
-            return b;
+            if (is.read(b) != -1) {
+                return b;
+            } else {
+                return new byte[0];
+            }
         } catch (IOException | SQLException e) {
             logger.error(e.getMessage());
             return null;
@@ -297,46 +297,4 @@ public class ESSyncUtil {
             sql.append(owner).append(".").append(columnName).append("=").append(value).append("  AND ");
         }
     }
-
-    /**
-     * 执行查询sql
-     */
-    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) {
-        Connection conn = null;
-        Statement smt = null;
-        ResultSet rs = null;
-        try {
-            conn = ds.getConnection();
-            smt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-            smt.setFetchSize(Integer.MIN_VALUE);
-            rs = smt.executeQuery(sql);
-
-            return fun.apply(rs);
-        } catch (SQLException e) {
-            logger.error("sqlRs has error, sql: {} ", sql);
-            throw new RuntimeException(e);
-        } finally {
-            if (rs != null) {
-                try {
-                    rs.close();
-                } catch (SQLException e) {
-                    logger.error("error to close result set");
-                }
-            }
-            if (smt != null) {
-                try {
-                    smt.close();
-                } catch (SQLException e) {
-                    logger.error("error to close statement");
-                }
-            }
-            if (conn != null) {
-                try {
-                    conn.close();
-                } catch (SQLException e) {
-                    logger.error("error to close db connection");
-                }
-            }
-        }
-    }
 }

+ 2 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -33,6 +33,7 @@ import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
  * ES 操作模板
@@ -137,7 +138,7 @@ public class ESTemplate {
             (fieldName, value) -> sql.append("_v.").append(fieldName).append("=").append(value).append(" AND "));
         int len = sql.length();
         sql.delete(len - 4, len);
-        Integer syncCount = (Integer) ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
+        Integer syncCount = (Integer) Util.sqlRS(ds, sql.toString(), rs -> {
             int count = 0;
             try {
                 while (rs.next()) {

+ 1 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/TestConstant.java

@@ -13,7 +13,7 @@ public class TestConstant {
     public final static String    esHosts      = "127.0.0.1:9300";
     public final static String    clusterName  = "elasticsearch";
 
-    public static DruidDataSource dataSource;
+    public final static DruidDataSource dataSource;
 
     static {
         dataSource = new DruidDataSource();

+ 18 - 8
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -6,9 +6,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -40,10 +38,9 @@ public class HbaseEtlService {
 
     private static Logger logger = LoggerFactory.getLogger(HbaseEtlService.class);
 
-
     /**
      * 建表
-     * 
+     *
      * @param hbaseTemplate
      * @param config
      */
@@ -62,7 +59,7 @@ public class HbaseEtlService {
 
     /**
      * 导入数据
-     * 
+     *
      * @param ds 数据源
      * @param hbaseTemplate hbaseTemplate
      * @param config 配置
@@ -154,7 +151,20 @@ public class HbaseEtlService {
             if (cnt >= 10000) {
                 int threadCount = 3;
                 long perThreadCnt = cnt / threadCount;
-                ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+                ExecutorService executor = new ThreadPoolExecutor(threadCount,
+                    threadCount,
+                    5000L,
+                    TimeUnit.MILLISECONDS,
+                    new SynchronousQueue<>(),
+                    (r, exe) -> {
+                        if (!exe.isShutdown()) {
+                            try {
+                                exe.getQueue().put(r);
+                            } catch (InterruptedException e1) {
+                                // ignore
+                            }
+                        }
+                    });
                 List<Future<Boolean>> futures = new ArrayList<>(threadCount);
                 for (int i = 0; i < threadCount; i++) {
                     long offset = i * perThreadCnt;
@@ -201,7 +211,7 @@ public class HbaseEtlService {
 
     /**
      * 执行导入
-     * 
+     *
      * @param ds
      * @param sql
      * @param hbaseMapping

+ 3 - 3
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -20,9 +20,9 @@ import com.alibaba.otter.canal.client.adapter.support.Dml;
  */
 public class HbaseSyncService {
 
-    private Logger        logger = LoggerFactory.getLogger(this.getClass());
+    private static Logger logger = LoggerFactory.getLogger(HbaseSyncService.class);
 
-    private HbaseTemplate hbaseTemplate;                                    // HBase操作模板
+    private HbaseTemplate hbaseTemplate;                                           // HBase操作模板
 
     public HbaseSyncService(HbaseTemplate hbaseTemplate){
         this.hbaseTemplate = hbaseTemplate;
@@ -140,7 +140,7 @@ public class HbaseSyncService {
                                         Integer.parseInt((String) entry.getValue()));
                                     bytes = Bytes.toBytes(v);
                                 } catch (Exception e) {
-                                    // ignore
+                                    logger.error(e.getMessage(), e);
                                 }
                             }
                         }

+ 1 - 3
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/PhTypeUtil.java

@@ -7,12 +7,10 @@ import java.math.RoundingMode;
 import java.sql.Timestamp;
 import java.util.Date;
 
-import com.alibaba.otter.canal.client.adapter.support.Util;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.joda.time.DateTime;
 
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import com.google.common.math.LongMath;
-import org.joda.time.DateTimeZone;
 
 /**
  * Phoenix类型转换工具类

+ 1 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/EtlLock.java

@@ -27,7 +27,7 @@ public class EtlLock {
 
     private static final Map<String, InterProcessMutex> DISTRIBUTED_LOCK = new ConcurrentHashMap<>();
 
-    private static Mode                                 mode             = Mode.LOCAL;
+    private Mode                                        mode             = Mode.LOCAL;
 
     @Resource
     private CuratorClient                               curatorClient;

+ 4 - 4
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java

@@ -34,7 +34,7 @@ public class SyncSwitch {
 
     private static final Map<String, BooleanMutex> DISTRIBUTED_LOCK   = new ConcurrentHashMap<>();
 
-    private static Mode                            mode               = Mode.LOCAL;
+    private Mode                                   mode               = Mode.LOCAL;
 
     @Resource
     private AdapterCanalConfig                     adapterCanalConfig;
@@ -165,20 +165,20 @@ public class SyncSwitch {
         }
     }
 
-    public Boolean status(String destination) {
+    public boolean status(String destination) {
         if (mode == Mode.LOCAL) {
             BooleanMutex mutex = LOCAL_LOCK.get(destination);
             if (mutex != null) {
                 return mutex.state();
             } else {
-                return null;
+                return false;
             }
         } else {
             BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
             if (mutex != null) {
                 return mutex.state();
             } else {
-                return null;
+                return false;
             }
         }
     }

+ 16 - 5
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -2,10 +2,7 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +40,21 @@ public abstract class AbstractCanalAdapterWorker {
 
     public AbstractCanalAdapterWorker(List<List<OuterAdapter>> canalOuterAdapters){
         this.canalOuterAdapters = canalOuterAdapters;
-        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
+        int adaptersSize = canalOuterAdapters.size();
+        this.groupInnerExecutorService = new ThreadPoolExecutor(adaptersSize,
+            adaptersSize,
+            5000L,
+            TimeUnit.MILLISECONDS,
+            new SynchronousQueue<>(),
+            (r, exe) -> {
+                if (!exe.isShutdown()) {
+                    try {
+                        exe.getQueue().put(r);
+                    } catch (InterruptedException e1) {
+                        // ignore
+                    }
+                }
+            });
         syncSwitch = (SyncSwitch) SpringContext.getBean(SyncSwitch.class);
     }
 

+ 17 - 7
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -1,9 +1,7 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import org.apache.kafka.common.errors.WakeupException;
 
@@ -44,12 +42,22 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
     protected void process() {
         while (!running) {
             try {
-                Thread.sleep(1000);
+                Thread.sleep(500);
             } catch (InterruptedException e) {
                 // ignore
             }
         }
-        ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
+        ExecutorService workerExecutor = new ThreadPoolExecutor(1, 1,
+                5000L, TimeUnit.MILLISECONDS,
+                new SynchronousQueue<>(), (r, exe) -> {
+            if (!exe.isShutdown()) {
+                try {
+                    exe.getQueue().put(r);
+                } catch (InterruptedException e1) {
+                    //ignore
+                }
+            }
+        });
         int retry = canalClientConfig.getRetries() == null
                     || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
@@ -63,8 +71,8 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                 connector.subscribe();
                 logger.info("=============> Subscribe topic: {} succeed <=============", this.topic);
                 while (running) {
-                    Boolean status = syncSwitch.status(canalDestination);
-                    if (status != null && !status) {
+                    boolean status = syncSwitch.status(canalDestination);
+                    if (!status) {
                         connector.disconnect();
                         break;
                     }
@@ -85,6 +93,8 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             }
         }
 
+        workerExecutor.shutdown();
+
         try {
             connector.unsubscribe();
         } catch (WakeupException e) {

+ 15 - 23
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -5,7 +5,7 @@ import java.net.SocketAddress;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -176,40 +176,32 @@ public class CanalAdapterLoader {
     public void destroy() {
         if (!canalWorkers.isEmpty()) {
             ExecutorService stopExecutorService = Executors.newFixedThreadPool(canalWorkers.size());
-            List<Future<Boolean>> futures = new ArrayList<>();
             for (CanalAdapterWorker canalAdapterWorker : canalWorkers.values()) {
-                futures.add(stopExecutorService.submit(() -> {
-                    canalAdapterWorker.stop();
-                    return true;
-                }));
+                stopExecutorService.execute(canalAdapterWorker::stop);
             }
-            futures.forEach(future -> {
-                try {
-                    future.get();
-                } catch (Exception e) {
+            stopExecutorService.shutdown();
+            try {
+                while (!stopExecutorService.awaitTermination(1, TimeUnit.SECONDS)) {
                     // ignore
                 }
-            });
-            stopExecutorService.shutdown();
+            } catch (InterruptedException e) {
+                // ignore
+            }
         }
 
         if (!canalMQWorker.isEmpty()) {
             ExecutorService stopMQWorkerService = Executors.newFixedThreadPool(canalMQWorker.size());
-            List<Future<Boolean>> futures = new ArrayList<>();
             for (AbstractCanalAdapterWorker canalAdapterMQWorker : canalMQWorker.values()) {
-                futures.add(stopMQWorkerService.submit(() -> {
-                    canalAdapterMQWorker.stop();
-                    return true;
-                }));
+                stopMQWorkerService.execute(canalAdapterMQWorker::stop);
             }
-            futures.forEach(future -> {
-                try {
-                    future.get();
-                } catch (Exception e) {
+            stopMQWorkerService.shutdown();
+            try {
+                while (!stopMQWorkerService.awaitTermination(1, TimeUnit.SECONDS)) {
                     // ignore
                 }
-            });
-            stopMQWorkerService.shutdown();
+            } catch (InterruptedException e) {
+                // ignore
+            }
         }
         logger.info("All canal adapters destroyed");
     }

+ 21 - 6
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -1,8 +1,7 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
 
 import org.apache.kafka.common.errors.WakeupException;
 
@@ -49,8 +48,22 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
             }
         }
 
-        ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
-        int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+        ExecutorService workerExecutor = new ThreadPoolExecutor(1,
+            1,
+            5000L,
+            TimeUnit.MILLISECONDS,
+            new SynchronousQueue<>(),
+            (r, exe) -> {
+                if (!exe.isShutdown()) {
+                    try {
+                        exe.getQueue().put(r);
+                    } catch (InterruptedException e1) {
+                        // ignore
+                    }
+                }
+            });
+        int retry = canalClientConfig.getRetries() == null
+                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
 
         while (running) {
@@ -62,8 +75,8 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
                 connector.subscribe();
                 logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
                 while (running) {
-                    Boolean status = syncSwitch.status(canalDestination);
-                    if (status != null && !status) {
+                    boolean status = syncSwitch.status(canalDestination);
+                    if (!status) {
                         connector.disconnect();
                         break;
                     }
@@ -84,6 +97,8 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
             }
         }
 
+        workerExecutor.shutdown();
+
         try {
             connector.unsubscribe();
         } catch (WakeupException e) {

+ 1 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -140,7 +140,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                     }
                 }
 
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 logger.error("process error!", e);
             } finally {
                 connector.disconnect();

+ 4 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationConfigMonitor.java

@@ -1,7 +1,9 @@
 package com.alibaba.otter.canal.adapter.launcher.monitor;
 
 import java.io.File;
-import java.io.FileReader;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 import javax.annotation.PostConstruct;
@@ -68,7 +70,7 @@ public class ApplicationConfigMonitor {
             super.onFileChange(file);
             try {
                 // 检查yml格式
-                new Yaml().loadAs(new FileReader(file), Map.class);
+                new Yaml().loadAs(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8), Map.class);
 
                 canalAdapterService.destroy();
 

+ 8 - 3
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/DbRemoteConfigLoader.java

@@ -1,6 +1,8 @@
 package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
 
-import java.io.FileWriter;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -35,7 +37,7 @@ public class DbRemoteConfigLoader implements RemoteConfigLoader {
 
     private DruidDataSource          dataSource;
 
-    private static volatile long     currentConfigTimestamp = 0;
+    private volatile long            currentConfigTimestamp = 0;
     private Map<String, ConfigItem>  remoteAdapterConfigs   = new MapMaker().makeMap();
 
     private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
@@ -115,7 +117,10 @@ public class DbRemoteConfigLoader implements RemoteConfigLoader {
      * @param content 文件内容
      */
     private void overrideLocalCanalConfig(String content) {
-        try (FileWriter writer = new FileWriter(CommonUtils.getConfPath() + "application.yml")) {
+
+        try (OutputStreamWriter writer = new OutputStreamWriter(
+            new FileOutputStream(CommonUtils.getConfPath() + "application.yml"),
+            StandardCharsets.UTF_8)) {
             writer.write(content);
             writer.flush();
         } catch (Exception e) {

+ 9 - 4
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/RemoteAdapterMonitorImpl.java

@@ -1,11 +1,14 @@
 package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
 
-import com.alibaba.otter.canal.common.utils.CommonUtils;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileWriter;
+import com.alibaba.otter.canal.common.utils.CommonUtils;
 
 /**
  * 远程配置监听器实现
@@ -35,7 +38,9 @@ public class RemoteAdapterMonitorImpl implements RemoteAdapterMonitor {
             }
         }
         String name = configItem.getName();
-        try (FileWriter writer = new FileWriter(confPath + category + "/" + configItem.getName())) {
+        try (OutputStreamWriter writer = new OutputStreamWriter(
+            new FileOutputStream(confPath + category + "/" + configItem.getName()),
+            StandardCharsets.UTF_8)) {
             writer.write(configItem.getContent());
             writer.flush();
             logger.info("## Loaded remote adapter config: {}/{}", category, name);

+ 17 - 17
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -69,16 +69,16 @@ public class CommonRest {
         }
         try {
 
-            Boolean oriSwitchStatus;
+            boolean oriSwitchStatus;
             if (destination != null) {
                 oriSwitchStatus = syncSwitch.status(destination);
-                if (oriSwitchStatus != null && oriSwitchStatus) {
+                if (oriSwitchStatus) {
                     syncSwitch.off(destination);
                 }
             } else {
                 // task可能为destination,直接锁task
                 oriSwitchStatus = syncSwitch.status(task);
-                if (oriSwitchStatus != null && oriSwitchStatus) {
+                if (oriSwitchStatus) {
                     syncSwitch.off(task);
                 }
             }
@@ -89,9 +89,9 @@ public class CommonRest {
                 }
                 return adapter.etl(task, paramArray);
             } finally {
-                if (destination != null && oriSwitchStatus != null && oriSwitchStatus) {
+                if (destination != null && oriSwitchStatus) {
                     syncSwitch.on(destination);
-                } else if (destination == null && oriSwitchStatus != null && oriSwitchStatus) {
+                } else if (destination == null && oriSwitchStatus) {
                     syncSwitch.on(task);
                 }
             }
@@ -102,7 +102,7 @@ public class CommonRest {
 
     /**
      * ETL curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST
-     * 
+     *
      * @param type 类型 hbase, es
      * @param task 任务名对应配置文件名 mytest_person2.yml
      * @param params etl where条件参数, 为空全部导入
@@ -129,7 +129,7 @@ public class CommonRest {
 
     /**
      * 统计总数 curl http://127.0.0.1:8081/count/hbase/mytest_person2.yml
-     * 
+     *
      * @param type 类型 hbase, es
      * @param task 任务名对应配置文件名 mytest_person2.yml
      * @return
@@ -148,11 +148,11 @@ public class CommonRest {
         Set<String> destinations = adapterCanalConfig.DESTINATIONS;
         for (String destination : destinations) {
             Map<String, String> resMap = new LinkedHashMap<>();
-            Boolean status = syncSwitch.status(destination);
-            String resStatus = "none";
-            if (status != null && status) {
+            boolean status = syncSwitch.status(destination);
+            String resStatus;
+            if (status) {
                 resStatus = "on";
-            } else if (status != null && !status) {
+            } else {
                 resStatus = "off";
             }
             resMap.put("destination", destination);
@@ -164,7 +164,7 @@ public class CommonRest {
 
     /**
      * 实例同步开关 curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT
-     * 
+     *
      * @param destination 实例名称
      * @param status 开关状态: off on
      * @return
@@ -189,17 +189,17 @@ public class CommonRest {
 
     /**
      * 获取实例开关状态 curl http://127.0.0.1:8081/syncSwitch/example
-     * 
+     *
      * @param destination 实例名称
      * @return
      */
     @GetMapping("/syncSwitch/{destination}")
     public Map<String, String> etl(@PathVariable String destination) {
-        Boolean status = syncSwitch.status(destination);
-        String resStatus = "none";
-        if (status != null && status) {
+        boolean status = syncSwitch.status(destination);
+        String resStatus;
+        if (status) {
             resStatus = "on";
-        } else if (status != null && !status) {
+        } else {
             resStatus = "off";
         }
         Map<String, String> res = new LinkedHashMap<>();

+ 11 - 11
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -1,10 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.rdb.config;
 
-import org.apache.commons.lang.StringUtils;
-
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
+
 /**
  * RDB表映射配置
  *
@@ -21,7 +21,7 @@ public class MappingConfig {
 
     private String    outerAdapterKey; // 对应适配器的key
 
-    private Boolean   concurrent;      // 是否并行同步
+    private boolean   concurrent = false;      // 是否并行同步
 
     private DbMapping dbMapping;       // db映射配置
 
@@ -49,11 +49,11 @@ public class MappingConfig {
         this.outerAdapterKey = outerAdapterKey;
     }
 
-    public Boolean getConcurrent() {
-        return concurrent == null ? false : concurrent;
+    public boolean getConcurrent() {
+        return concurrent;
     }
 
-    public void setConcurrent(Boolean concurrent) {
+    public void setConcurrent(boolean concurrent) {
         this.concurrent = concurrent;
     }
 
@@ -87,11 +87,11 @@ public class MappingConfig {
 
     public static class DbMapping {
 
-        private Boolean             mirrorDb    = false;                 // 是否镜像库
+        private boolean             mirrorDb    = false;                 // 是否镜像库
         private String              database;                            // 数据库名或schema名
         private String              table;                               // 表名
         private Map<String, String> targetPk    = new LinkedHashMap<>(); // 目标表主键字段
-        private Boolean             mapAll      = false;                 // 映射所有字段
+        private boolean             mapAll      = false;                 // 映射所有字段
         private String              targetDb;                            // 目标库名
         private String              targetTable;                         // 目标表名
         private Map<String, String> targetColumns;                       // 目标表字段映射
@@ -103,11 +103,11 @@ public class MappingConfig {
 
         private Map<String, String> allMapColumns;
 
-        public Boolean getMirrorDb() {
-            return mirrorDb == null ? false : mirrorDb;
+        public boolean getMirrorDb() {
+            return mirrorDb;
         }
 
-        public void setMirrorDb(Boolean mirrorDb) {
+        public void setMirrorDb(boolean mirrorDb) {
             this.mirrorDb = mirrorDb;
         }
 

+ 15 - 4
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -5,9 +5,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -78,7 +76,20 @@ public class RdbEtlService {
             if (cnt >= 10000) {
                 int threadCount = 3;
                 long perThreadCnt = cnt / threadCount;
-                ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+                ExecutorService executor = new ThreadPoolExecutor(threadCount,
+                    threadCount,
+                    5000L,
+                    TimeUnit.MILLISECONDS,
+                    new SynchronousQueue<>(),
+                    (r, exe) -> {
+                        if (!exe.isShutdown()) {
+                            try {
+                                exe.getQueue().put(r);
+                            } catch (InterruptedException e1) {
+                                // ignore
+                            }
+                        }
+                    });
                 List<Future<Boolean>> futures = new ArrayList<>(threadCount);
                 for (int i = 0; i < threadCount; i++) {
                     long offset = i * perThreadCnt;

+ 507 - 507
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -1,507 +1,507 @@
-package com.alibaba.otter.canal.client.adapter.rdb.service;
-
-import java.sql.Connection;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.function.Function;
-
-import javax.sql.DataSource;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
-import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
-import com.alibaba.otter.canal.client.adapter.rdb.support.BatchExecutor;
-import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
-import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.Util;
-
-/**
- * RDB同步操作业务
- *
- * @author rewerma 2018-11-7 下午06:45:49
- * @version 1.0.0
- */
-public class RdbSyncService {
-
-    private static final Logger               logger  = LoggerFactory.getLogger(RdbSyncService.class);
-
-    // 源库表字段类型缓存: instance.schema.table -> <columnName, jdbcType>
-    private Map<String, Map<String, Integer>> columnsTypeCache;
-
-    private int                               threads = 3;
-    private boolean                           skipDupException;
-
-    private List<SyncItem>[]                  dmlsPartition;
-    private BatchExecutor[]                   batchExecutors;
-    private ExecutorService[]                 executorThreads;
-
-    public List<SyncItem>[] getDmlsPartition() {
-        return dmlsPartition;
-    }
-
-    public Map<String, Map<String, Integer>> getColumnsTypeCache() {
-        return columnsTypeCache;
-    }
-
-    public RdbSyncService(DataSource dataSource, Integer threads, boolean skipDupException){
-        this(dataSource, threads, new ConcurrentHashMap<>(), skipDupException);
-    }
-
-    @SuppressWarnings("unchecked")
-    public RdbSyncService(DataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
-                          boolean skipDupException){
-        this.columnsTypeCache = columnsTypeCache;
-        this.skipDupException = skipDupException;
-        try {
-            if (threads != null) {
-                this.threads = threads;
-            }
-            this.dmlsPartition = new List[this.threads];
-            this.batchExecutors = new BatchExecutor[this.threads];
-            this.executorThreads = new ExecutorService[this.threads];
-            for (int i = 0; i < this.threads; i++) {
-                dmlsPartition[i] = new ArrayList<>();
-                batchExecutors[i] = new BatchExecutor(dataSource);
-                executorThreads[i] = Executors.newSingleThreadExecutor();
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * 批量同步回调
-     *
-     * @param dmls 批量 DML
-     * @param function 回调方法
-     */
-    public void sync(List<Dml> dmls, Function<Dml, Boolean> function) {
-        try {
-            boolean toExecute = false;
-            for (Dml dml : dmls) {
-                if (!toExecute) {
-                    toExecute = function.apply(dml);
-                } else {
-                    function.apply(dml);
-                }
-            }
-            if (toExecute) {
-                List<Future<Boolean>> futures = new ArrayList<>();
-                for (int i = 0; i < threads; i++) {
-                    int j = i;
-                    futures.add(executorThreads[i].submit(() -> {
-                        try {
-                            dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
-                                syncItem.config,
-                                syncItem.singleDml));
-                            dmlsPartition[j].clear();
-                            batchExecutors[j].commit();
-                            return true;
-                        } catch (Throwable e) {
-                            batchExecutors[j].rollback();
-                            throw new RuntimeException(e);
-                        }
-                    }));
-                }
-
-                futures.forEach(future -> {
-                    try {
-                        future.get();
-                    } catch (ExecutionException | InterruptedException e) {
-                        throw new RuntimeException(e);
-                    }
-                });
-            }
-        } finally {
-            for (BatchExecutor batchExecutor : batchExecutors) {
-                if (batchExecutor != null) {
-                    batchExecutor.close();
-                }
-            }
-        }
-    }
-
-    /**
-     * 批量同步
-     *
-     * @param mappingConfig 配置集合
-     * @param dmls 批量 DML
-     */
-    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls, Properties envProperties) {
-        sync(dmls, dml -> {
-            if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
-                // DDL
-            columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
-            return false;
-        } else {
-            // DML
-            String destination = StringUtils.trimToEmpty(dml.getDestination());
-            String groupId = StringUtils.trimToEmpty(dml.getGroupId());
-            String database = dml.getDatabase();
-            String table = dml.getTable();
-            Map<String, MappingConfig> configMap;
-            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
-                configMap = mappingConfig.get(destination + "-" + groupId + "_" + database + "-" + table);
-            } else {
-                configMap = mappingConfig.get(destination + "_" + database + "-" + table);
-            }
-
-            if (configMap == null) {
-                return false;
-            }
-
-            if (configMap.values().isEmpty()) {
-                return false;
-            }
-
-            for (MappingConfig config : configMap.values()) {
-                if (config.getConcurrent()) {
-                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                    singleDmls.forEach(singleDml -> {
-                        int hash = pkHash(config.getDbMapping(), singleDml.getData());
-                        SyncItem syncItem = new SyncItem(config, singleDml);
-                        dmlsPartition[hash].add(syncItem);
-                    });
-                } else {
-                    int hash = 0;
-                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                    singleDmls.forEach(singleDml -> {
-                        SyncItem syncItem = new SyncItem(config, singleDml);
-                        dmlsPartition[hash].add(syncItem);
-                    });
-                }
-            }
-            return true;
-        }
-    }   );
-    }
-
-    /**
-     * 单条 dml 同步
-     *
-     * @param batchExecutor 批量事务执行器
-     * @param config 对应配置对象
-     * @param dml DML
-     */
-    public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
-        if (config != null) {
-            try {
-                String type = dml.getType();
-                if (type != null && type.equalsIgnoreCase("INSERT")) {
-                    insert(batchExecutor, config, dml);
-                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
-                    update(batchExecutor, config, dml);
-                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
-                    delete(batchExecutor, config, dml);
-                } else if (type != null && type.equalsIgnoreCase("TRUNCATE")) {
-                    truncate(batchExecutor, config);
-                }
-                if (logger.isDebugEnabled()) {
-                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
-                }
-            } catch (SQLException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    /**
-     * 插入操作
-     *
-     * @param config 配置项
-     * @param dml DML数据
-     */
-    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
-        Map<String, Object> data = dml.getData();
-        if (data == null || data.isEmpty()) {
-            return;
-        }
-
-        DbMapping dbMapping = config.getDbMapping();
-
-        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
-
-        StringBuilder insertSql = new StringBuilder();
-        insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
-
-        columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
-        int len = insertSql.length();
-        insertSql.delete(len - 1, len).append(") VALUES (");
-        int mapLen = columnsMap.size();
-        for (int i = 0; i < mapLen; i++) {
-            insertSql.append("?,");
-        }
-        len = insertSql.length();
-        insertSql.delete(len - 1, len).append(")");
-
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
-
-        List<Map<String, ?>> values = new ArrayList<>();
-        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-            String targetColumnName = entry.getKey();
-            String srcColumnName = entry.getValue();
-            if (srcColumnName == null) {
-                srcColumnName = Util.cleanColumn(targetColumnName);
-            }
-
-            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
-            if (type == null) {
-                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
-            }
-            Object value = data.get(srcColumnName);
-            BatchExecutor.setValue(values, type, value);
-        }
-
-        try {
-            batchExecutor.execute(insertSql.toString(), values);
-        } catch (SQLException e) {
-            if (skipDupException
-                && (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001: 违反唯一约束条件"))) {
-                // ignore
-                // TODO 增加更多关系数据库的主键冲突的错误码
-            } else {
-                throw e;
-            }
-        }
-        if (logger.isTraceEnabled()) {
-            logger.trace("Insert into target table, sql: {}", insertSql);
-        }
-
-    }
-
-    /**
-     * 更新操作
-     *
-     * @param config 配置项
-     * @param dml DML数据
-     */
-    private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
-        Map<String, Object> data = dml.getData();
-        if (data == null || data.isEmpty()) {
-            return;
-        }
-
-        Map<String, Object> old = dml.getOld();
-        if (old == null || old.isEmpty()) {
-            return;
-        }
-
-        DbMapping dbMapping = config.getDbMapping();
-
-        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
-
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
-
-        StringBuilder updateSql = new StringBuilder();
-        updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
-        List<Map<String, ?>> values = new ArrayList<>();
-        boolean hasMatched = false;
-        for (String srcColumnName : old.keySet()) {
-            List<String> targetColumnNames = new ArrayList<>();
-            columnsMap.forEach((targetColumn, srcColumn) -> {
-                if (srcColumnName.equalsIgnoreCase(srcColumn)) {
-                    targetColumnNames.add(targetColumn);
-                }
-            });
-            if (!targetColumnNames.isEmpty()) {
-                hasMatched = true;
-                for (String targetColumnName : targetColumnNames) {
-                    updateSql.append(targetColumnName).append("=?, ");
-                    Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
-                    if (type == null) {
-                        throw new RuntimeException("Target column: " + targetColumnName + " not matched");
-                    }
-                    BatchExecutor.setValue(values, type, data.get(srcColumnName));
-                }
-            }
-        }
-        if (!hasMatched) {
-            logger.warn("Did not matched any columns to update ");
-            return;
-        }
-        int len = updateSql.length();
-        updateSql.delete(len - 2, len).append(" WHERE ");
-
-        // 拼接主键
-        appendCondition(dbMapping, updateSql, ctype, values, data, old);
-        batchExecutor.execute(updateSql.toString(), values);
-        if (logger.isTraceEnabled()) {
-            logger.trace("Update target table, sql: {}", updateSql);
-        }
-    }
-
-    /**
-     * 删除操作
-     *
-     * @param config
-     * @param dml
-     */
-    private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
-        Map<String, Object> data = dml.getData();
-        if (data == null || data.isEmpty()) {
-            return;
-        }
-
-        DbMapping dbMapping = config.getDbMapping();
-
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
-
-        StringBuilder sql = new StringBuilder();
-        sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
-
-        List<Map<String, ?>> values = new ArrayList<>();
-        // 拼接主键
-        appendCondition(dbMapping, sql, ctype, values, data);
-        batchExecutor.execute(sql.toString(), values);
-        if (logger.isTraceEnabled()) {
-            logger.trace("Delete from target table, sql: {}", sql);
-        }
-    }
-
-    /**
-     * truncate操作
-     *
-     * @param config
-     */
-    private void truncate(BatchExecutor batchExecutor, MappingConfig config) throws SQLException {
-        DbMapping dbMapping = config.getDbMapping();
-        StringBuilder sql = new StringBuilder();
-        sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));
-        batchExecutor.execute(sql.toString(), new ArrayList<>());
-        if (logger.isTraceEnabled()) {
-            logger.trace("Truncate target table, sql: {}", sql);
-        }
-    }
-
-    /**
-     * 获取目标字段类型
-     *
-     * @param conn sql connection
-     * @param config 映射配置
-     * @return 字段sqlType
-     */
-    private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
-        DbMapping dbMapping = config.getDbMapping();
-        String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
-        Map<String, Integer> columnType = columnsTypeCache.get(cacheKey);
-        if (columnType == null) {
-            synchronized (RdbSyncService.class) {
-                columnType = columnsTypeCache.get(cacheKey);
-                if (columnType == null) {
-                    columnType = new LinkedHashMap<>();
-                    final Map<String, Integer> columnTypeTmp = columnType;
-                    String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE 1=2";
-                    Util.sqlRS(conn, sql, rs -> {
-                        try {
-                            ResultSetMetaData rsd = rs.getMetaData();
-                            int columnCount = rsd.getColumnCount();
-                            for (int i = 1; i <= columnCount; i++) {
-                                columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
-                            }
-                            columnsTypeCache.put(cacheKey, columnTypeTmp);
-                        } catch (SQLException e) {
-                            logger.error(e.getMessage(), e);
-                        }
-                    });
-                }
-            }
-        }
-        return columnType;
-    }
-
-    /**
-     * 拼接主键 where条件
-     */
-    private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
-                                 List<Map<String, ?>> values, Map<String, Object> d) {
-        appendCondition(dbMapping, sql, ctype, values, d, null);
-    }
-
-    private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
-                                 List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
-        // 拼接主键
-        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
-            String targetColumnName = entry.getKey();
-            String srcColumnName = entry.getValue();
-            if (srcColumnName == null) {
-                srcColumnName = Util.cleanColumn(targetColumnName);
-            }
-            sql.append(targetColumnName).append("=? AND ");
-            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
-            if (type == null) {
-                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
-            }
-            // 如果有修改主键的情况
-            if (o != null && o.containsKey(srcColumnName)) {
-                BatchExecutor.setValue(values, type, o.get(srcColumnName));
-            } else {
-                BatchExecutor.setValue(values, type, d.get(srcColumnName));
-            }
-        }
-        int len = sql.length();
-        sql.delete(len - 4, len);
-    }
-
-    public static class SyncItem {
-
-        private MappingConfig config;
-        private SingleDml     singleDml;
-
-        public SyncItem(MappingConfig config, SingleDml singleDml){
-            this.config = config;
-            this.singleDml = singleDml;
-        }
-    }
-
-    /**
-     * 取主键hash
-     */
-    public int pkHash(DbMapping dbMapping, Map<String, Object> d) {
-        return pkHash(dbMapping, d, null);
-    }
-
-    public int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o) {
-        int hash = 0;
-        // 取主键
-        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
-            String targetColumnName = entry.getKey();
-            String srcColumnName = entry.getValue();
-            if (srcColumnName == null) {
-                srcColumnName = Util.cleanColumn(targetColumnName);
-            }
-            Object value = null;
-            if (o != null && o.containsKey(srcColumnName)) {
-                value = o.get(srcColumnName);
-            } else if (d != null) {
-                value = d.get(srcColumnName);
-            }
-            if (value != null) {
-                hash += value.hashCode();
-            }
-        }
-        hash = Math.abs(hash) % threads;
-        return Math.abs(hash);
-    }
-
-    public void close() {
-        for (int i = 0; i < threads; i++) {
-            executorThreads[i].shutdown();
-        }
-    }
-}
+package com.alibaba.otter.canal.client.adapter.rdb.service;
+
+import java.sql.Connection;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.rdb.support.BatchExecutor;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+
+/**
+ * RDB同步操作业务
+ *
+ * @author rewerma 2018-11-7 下午06:45:49
+ * @version 1.0.0
+ */
+public class RdbSyncService {
+
+    private static final Logger               logger  = LoggerFactory.getLogger(RdbSyncService.class);
+
+    // 源库表字段类型缓存: instance.schema.table -> <columnName, jdbcType>
+    private Map<String, Map<String, Integer>> columnsTypeCache;
+
+    private int                               threads = 3;
+    private boolean                           skipDupException;
+
+    private List<SyncItem>[]                  dmlsPartition;
+    private BatchExecutor[]                   batchExecutors;
+    private ExecutorService[]                 executorThreads;
+
+    public List<SyncItem>[] getDmlsPartition() {
+        return dmlsPartition;
+    }
+
+    public Map<String, Map<String, Integer>> getColumnsTypeCache() {
+        return columnsTypeCache;
+    }
+
+    public RdbSyncService(DataSource dataSource, Integer threads, boolean skipDupException){
+        this(dataSource, threads, new ConcurrentHashMap<>(), skipDupException);
+    }
+
+    @SuppressWarnings("unchecked")
+    public RdbSyncService(DataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
+                          boolean skipDupException){
+        this.columnsTypeCache = columnsTypeCache;
+        this.skipDupException = skipDupException;
+        try {
+            if (threads != null) {
+                this.threads = threads;
+            }
+            this.dmlsPartition = new List[this.threads];
+            this.batchExecutors = new BatchExecutor[this.threads];
+            this.executorThreads = new ExecutorService[this.threads];
+            for (int i = 0; i < this.threads; i++) {
+                dmlsPartition[i] = new ArrayList<>();
+                batchExecutors[i] = new BatchExecutor(dataSource);
+                executorThreads[i] = Executors.newSingleThreadExecutor();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * 批量同步回调
+     *
+     * @param dmls 批量 DML
+     * @param function 回调方法
+     */
+    public void sync(List<Dml> dmls, Function<Dml, Boolean> function) {
+        try {
+            boolean toExecute = false;
+            for (Dml dml : dmls) {
+                if (!toExecute) {
+                    toExecute = function.apply(dml);
+                } else {
+                    function.apply(dml);
+                }
+            }
+            if (toExecute) {
+                List<Future<Boolean>> futures = new ArrayList<>();
+                for (int i = 0; i < threads; i++) {
+                    int j = i;
+                    futures.add(executorThreads[i].submit(() -> {
+                        try {
+                            dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
+                                syncItem.config,
+                                syncItem.singleDml));
+                            dmlsPartition[j].clear();
+                            batchExecutors[j].commit();
+                            return true;
+                        } catch (Throwable e) {
+                            batchExecutors[j].rollback();
+                            throw new RuntimeException(e);
+                        }
+                    }));
+                }
+
+                futures.forEach(future -> {
+                    try {
+                        future.get();
+                    } catch (ExecutionException | InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            }
+        } finally {
+            for (BatchExecutor batchExecutor : batchExecutors) {
+                if (batchExecutor != null) {
+                    batchExecutor.close();
+                }
+            }
+        }
+    }
+
+    /**
+     * 批量同步
+     *
+     * @param mappingConfig 配置集合
+     * @param dmls 批量 DML
+     */
+    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls, Properties envProperties) {
+        sync(dmls, dml -> {
+            if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
+                // DDL
+            columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+            return false;
+        } else {
+            // DML
+            String destination = StringUtils.trimToEmpty(dml.getDestination());
+            String groupId = StringUtils.trimToEmpty(dml.getGroupId());
+            String database = dml.getDatabase();
+            String table = dml.getTable();
+            Map<String, MappingConfig> configMap;
+            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                configMap = mappingConfig.get(destination + "-" + groupId + "_" + database + "-" + table);
+            } else {
+                configMap = mappingConfig.get(destination + "_" + database + "-" + table);
+            }
+
+            if (configMap == null) {
+                return false;
+            }
+
+            if (configMap.values().isEmpty()) {
+                return false;
+            }
+
+            for (MappingConfig config : configMap.values()) {
+                if (config.getConcurrent()) {
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                    singleDmls.forEach(singleDml -> {
+                        int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                        SyncItem syncItem = new SyncItem(config, singleDml);
+                        dmlsPartition[hash].add(syncItem);
+                    });
+                } else {
+                    int hash = 0;
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                    singleDmls.forEach(singleDml -> {
+                        SyncItem syncItem = new SyncItem(config, singleDml);
+                        dmlsPartition[hash].add(syncItem);
+                    });
+                }
+            }
+            return true;
+        }
+    }   );
+    }
+
+    /**
+     * 单条 dml 同步
+     *
+     * @param batchExecutor 批量事务执行器
+     * @param config 对应配置对象
+     * @param dml DML
+     */
+    public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+        if (config != null) {
+            try {
+                String type = dml.getType();
+                if (type != null && type.equalsIgnoreCase("INSERT")) {
+                    insert(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                    update(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                    delete(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("TRUNCATE")) {
+                    truncate(batchExecutor, config);
+                }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * 插入操作
+     *
+     * @param config 配置项
+     * @param dml DML数据
+     */
+    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
+        Map<String, Object> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
+
+        StringBuilder insertSql = new StringBuilder();
+        insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
+
+        columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+        int len = insertSql.length();
+        insertSql.delete(len - 1, len).append(") VALUES (");
+        int mapLen = columnsMap.size();
+        for (int i = 0; i < mapLen; i++) {
+            insertSql.append("?,");
+        }
+        len = insertSql.length();
+        insertSql.delete(len - 1, len).append(")");
+
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+        List<Map<String, ?>> values = new ArrayList<>();
+        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = Util.cleanColumn(targetColumnName);
+            }
+
+            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+            if (type == null) {
+                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
+            }
+            Object value = data.get(srcColumnName);
+            BatchExecutor.setValue(values, type, value);
+        }
+
+        try {
+            batchExecutor.execute(insertSql.toString(), values);
+        } catch (SQLException e) {
+            if (skipDupException
+                && (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001: 违反唯一约束条件"))) {
+                // ignore
+                // TODO 增加更多关系数据库的主键冲突的错误码
+            } else {
+                throw e;
+            }
+        }
+        if (logger.isTraceEnabled()) {
+            logger.trace("Insert into target table, sql: {}", insertSql);
+        }
+
+    }
+
+    /**
+     * 更新操作
+     *
+     * @param config 配置项
+     * @param dml DML数据
+     */
+    private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
+        Map<String, Object> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        Map<String, Object> old = dml.getOld();
+        if (old == null || old.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
+
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+        StringBuilder updateSql = new StringBuilder();
+        updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
+        List<Map<String, ?>> values = new ArrayList<>();
+        boolean hasMatched = false;
+        for (String srcColumnName : old.keySet()) {
+            List<String> targetColumnNames = new ArrayList<>();
+            columnsMap.forEach((targetColumn, srcColumn) -> {
+                if (srcColumnName.equalsIgnoreCase(srcColumn)) {
+                    targetColumnNames.add(targetColumn);
+                }
+            });
+            if (!targetColumnNames.isEmpty()) {
+                hasMatched = true;
+                for (String targetColumnName : targetColumnNames) {
+                    updateSql.append(targetColumnName).append("=?, ");
+                    Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+                    if (type == null) {
+                        throw new RuntimeException("Target column: " + targetColumnName + " not matched");
+                    }
+                    BatchExecutor.setValue(values, type, data.get(srcColumnName));
+                }
+            }
+        }
+        if (!hasMatched) {
+            logger.warn("Did not matched any columns to update ");
+            return;
+        }
+        int len = updateSql.length();
+        updateSql.delete(len - 2, len).append(" WHERE ");
+
+        // 拼接主键
+        appendCondition(dbMapping, updateSql, ctype, values, data, old);
+        batchExecutor.execute(updateSql.toString(), values);
+        if (logger.isTraceEnabled()) {
+            logger.trace("Update target table, sql: {}", updateSql);
+        }
+    }
+
+    /**
+     * 删除操作
+     *
+     * @param config
+     * @param dml
+     */
+    private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
+        Map<String, Object> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+        StringBuilder sql = new StringBuilder();
+        sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
+
+        List<Map<String, ?>> values = new ArrayList<>();
+        // 拼接主键
+        appendCondition(dbMapping, sql, ctype, values, data);
+        batchExecutor.execute(sql.toString(), values);
+        if (logger.isTraceEnabled()) {
+            logger.trace("Delete from target table, sql: {}", sql);
+        }
+    }
+
+    /**
+     * truncate操作
+     *
+     * @param config
+     */
+    private void truncate(BatchExecutor batchExecutor, MappingConfig config) throws SQLException {
+        DbMapping dbMapping = config.getDbMapping();
+        StringBuilder sql = new StringBuilder();
+        sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));
+        batchExecutor.execute(sql.toString(), new ArrayList<>());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Truncate target table, sql: {}", sql);
+        }
+    }
+
+    /**
+     * 获取目标字段类型
+     *
+     * @param conn sql connection
+     * @param config 映射配置
+     * @return 字段sqlType
+     */
+    private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
+        DbMapping dbMapping = config.getDbMapping();
+        String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
+        Map<String, Integer> columnType = columnsTypeCache.get(cacheKey);
+        if (columnType == null) {
+            synchronized (RdbSyncService.class) {
+                columnType = columnsTypeCache.get(cacheKey);
+                if (columnType == null) {
+                    columnType = new LinkedHashMap<>();
+                    final Map<String, Integer> columnTypeTmp = columnType;
+                    String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE 1=2";
+                    Util.sqlRS(conn, sql, rs -> {
+                        try {
+                            ResultSetMetaData rsd = rs.getMetaData();
+                            int columnCount = rsd.getColumnCount();
+                            for (int i = 1; i <= columnCount; i++) {
+                                columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
+                            }
+                            columnsTypeCache.put(cacheKey, columnTypeTmp);
+                        } catch (SQLException e) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    });
+                }
+            }
+        }
+        return columnType;
+    }
+
+    /**
+     * 拼接主键 where条件
+     */
+    private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
+                                 List<Map<String, ?>> values, Map<String, Object> d) {
+        appendCondition(dbMapping, sql, ctype, values, d, null);
+    }
+
+    private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
+                                 List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
+        // 拼接主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = Util.cleanColumn(targetColumnName);
+            }
+            sql.append(targetColumnName).append("=? AND ");
+            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+            if (type == null) {
+                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
+            }
+            // 如果有修改主键的情况
+            if (o != null && o.containsKey(srcColumnName)) {
+                BatchExecutor.setValue(values, type, o.get(srcColumnName));
+            } else {
+                BatchExecutor.setValue(values, type, d.get(srcColumnName));
+            }
+        }
+        int len = sql.length();
+        sql.delete(len - 4, len);
+    }
+
+    public static class SyncItem {
+
+        private MappingConfig config;
+        private SingleDml     singleDml;
+
+        public SyncItem(MappingConfig config, SingleDml singleDml){
+            this.config = config;
+            this.singleDml = singleDml;
+        }
+    }
+
+    /**
+     * 取主键hash
+     */
+    public int pkHash(DbMapping dbMapping, Map<String, Object> d) {
+        return pkHash(dbMapping, d, null);
+    }
+
+    public int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o) {
+        int hash = 0;
+        // 取主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = Util.cleanColumn(targetColumnName);
+            }
+            Object value = null;
+            if (o != null && o.containsKey(srcColumnName)) {
+                value = o.get(srcColumnName);
+            } else if (d != null) {
+                value = d.get(srcColumnName);
+            }
+            if (value != null) {
+                hash += value.hashCode();
+            }
+        }
+        hash = Math.abs(hash) % threads;
+        return Math.abs(hash);
+    }
+
+    public void close() {
+        for (int i = 0; i < threads; i++) {
+            executorThreads[i].shutdown();
+        }
+    }
+}

+ 3 - 4
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java

@@ -92,17 +92,16 @@ public class DBTest {
 
     private String clob2Str(Clob clob) {
         String content = "";
-        try {
-            Reader is = clob.getCharacterStream();
-            BufferedReader buff = new BufferedReader(is);
+        try (Reader is = clob.getCharacterStream(); BufferedReader buff = new BufferedReader(is)) {
             String line = buff.readLine();
-            StringBuffer sb = new StringBuffer();
+            StringBuilder sb = new StringBuilder();
             while (line != null) {
                 sb.append(line);
                 line = buff.readLine();
             }
             content = sb.toString();
         } catch (Exception e) {
+            e.printStackTrace();
         }
         return content;
     }

+ 1 - 1
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/TestConstant.java

@@ -10,7 +10,7 @@ public class TestConstant {
     public final static String    jdbcUser     = "root";
     public final static String    jdbcPassword = "121212";
 
-    public static DruidDataSource dataSource;
+    public final static DruidDataSource dataSource;
 
     static {
         dataSource = new DruidDataSource();