1
0
mcy 6 жил өмнө
parent
commit
2a2977eca7

+ 8 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -35,6 +35,14 @@ public class ESAdapter implements OuterAdapter {
 
     private ESSyncService   esSyncService;
 
+    public TransportClient getTransportClient() {
+        return transportClient;
+    }
+
+    public ESSyncService getEsSyncService() {
+        return esSyncService;
+    }
+
     @Override
     public void init(OuterAdapterConfig configuration) {
         try {

+ 56 - 71
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -1,6 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.es.service;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -14,10 +14,8 @@ import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
-import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
 import com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
-import com.alibaba.otter.canal.client.adapter.es.support.Util;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 
@@ -32,32 +30,38 @@ public class ESSyncService {
     }
 
     public void sync(Dml dml) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("DML: {}", JSON.toJSONString(dml));
+        }
         long begin = System.currentTimeMillis();
         String database = dml.getDatabase();
         String table = dml.getTable();
         List<ESSyncConfig> esSyncConfigs = ESSyncConfigLoader.getDbTableEsSyncConfig().get(database + "-" + table);
         if (esSyncConfigs != null) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Destination: {}, database:{}, table:{}, type:{}, effect index count: {}",
+            if (logger.isTraceEnabled()) {
+                logger.trace("Destination: {}, database:{}, table:{}, type:{}, effect index count: {}",
                     dml.getDestination(),
                     dml.getDatabase(),
                     dml.getTable(),
                     dml.getType(),
                     esSyncConfigs.size());
-                logger.debug(JSON.toJSONString(dml));
             }
 
             for (ESSyncConfig config : esSyncConfigs) {
-                logger.info("Prepared to sync index: {}, destination: {}",
-                    config.getEsMapping().get_index(),
-                    dml.getDestination());
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Prepared to sync index: {}, destination: {}",
+                        config.getEsMapping().get_index(),
+                        dml.getDestination());
+                }
                 this.sync(config, dml);
-                logger.info("Sync completed: {}, destination: {}",
-                    config.getEsMapping().get_index(),
-                    dml.getDestination());
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Sync completed: {}, destination: {}",
+                        config.getEsMapping().get_index(),
+                        dml.getDestination());
+                }
             }
-            if (logger.isDebugEnabled()) {
-                logger.debug("Sync elapsed time: {}, effect index count:{}, destination: {}",
+            if (logger.isTraceEnabled()) {
+                logger.trace("Sync elapsed time: {}, effect index count:{}, destination: {}",
                     (System.currentTimeMillis() - begin),
                     esSyncConfigs.size(),
                     dml.getDestination());
@@ -78,8 +82,8 @@ public class ESSyncService {
                 // delete(config, dml);
             }
 
-            if (logger.isDebugEnabled()) {
-                logger.debug("Sync elapsed time: {},destination: {}, es index: {}",
+            if (logger.isTraceEnabled()) {
+                logger.trace("Sync elapsed time: {},destination: {}, es index: {}",
                     (System.currentTimeMillis() - begin),
                     dml.getDestination(),
                     config.getEsMapping().get_index());
@@ -104,39 +108,11 @@ public class ESSyncService {
 
             // ------是否单表 & 所有字段都为简单字段------
             if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
-                Map<String, Object> esFieldData = new HashMap<>();
-
-                // ------通过dml对象插入ES------
-                // 获取所有字段
-                for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
-                    // 如果是主键字段则不插入
-                    if (fieldItem.getFieldName().equals(mapping.get_id())) {
-                        continue;
-                    }
-                    Object value = esTemplate.getDataValue(mapping,
-                        data,
-                        Util.cleanColumn(fieldItem.getColumnItems().iterator().next().getColumnName()));
-                    String fieldName = Util.cleanColumn(fieldItem.getFieldName());
-                    if (!mapping.getSkips().contains(fieldName)) {
-                        esFieldData.put(fieldName, esTemplate.convertType(mapping, fieldName, value));
-                    }
-                }
+                Map<String, Object> esFieldData = new LinkedHashMap<>();
+                Object idVal = esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
 
-                // 取出id值
-                Object idVal;
-                if (mapping.get_id() != null) {
-                    idVal = esTemplate.getDataValue(mapping,
-                        data,
-                        Util.cleanColumn(
-                            schemaItem.getSelectFields().get(mapping.get_id()).getColumn().getColumnName()));
-                } else {
-                    idVal = esTemplate.getDataValue(mapping,
-                        data,
-                        Util.cleanColumn(
-                            schemaItem.getSelectFields().get(mapping.getPk()).getColumn().getColumnName()));
-                }
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Single table insert ot es index, destination:{}, table: {}, index: {}, id: {}",
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Single table insert ot es index, destination:{}, table: {}, index: {}, id: {}",
                         config.getDestination(),
                         dml.getTable(),
                         mapping.get_index(),
@@ -150,35 +126,44 @@ public class ESSyncService {
                         mapping.get_index(),
                         idVal);
                 }
-                return; // 单表插入完成返回
+                continue; // 单表插入完成
             }
 
-            // ------是否主表------
+            // ------是否主表 查询sql来插入------
             if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
                 String sql = mapping.getSql();
-                String condition = ESSyncUtil.pkConditaionSql(mapping, data);
+                String condition = ESSyncUtil.pkConditionSql(mapping, data);
                 sql = ESSyncUtil.appendCondition(sql, condition);
                 DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
-//                ESSyncUtil.sqlRS(ds, sql, rs -> {
-//                    try {
-//                        while (rs.next()) {
-//                            Map<String, Object> esFieldData = new HashMap<>();
-//
-//                            Object idVal = ESSyncUtil.persistEsByRs(transportClient, config, rs, mainTable, esFieldData);
-//
-//                            if (logger.isDebugEnabled()) {
-//                                logger.debug("主表insert, 通过执行sql查询相应记录,并insert到Es, id: {}, esFieldValue: {}", idVal, esFieldData);
-//                            }
-//                            boolean result = ESSyncUtil.persist(transportClient, config, idVal, esFieldData, OpType.INSERT);
-//                            if (!result) {
-//                                logger.error("主表insert,通过执行sql查询响应记录,并insert到es, es插入失败,table: {}, index: {}", dml.getTable(), config.getEsSyn().getIndex());
-//                            }
-//                        }
-//                    } catch (Exception e) {
-//                        throw new RuntimeException(e);
-//                    }
-//                    return 0;
-//                });
+                ESSyncUtil.sqlRS(ds, sql, rs -> {
+                    try {
+                        while (rs.next()) {
+                            Map<String, Object> esFieldData = new LinkedHashMap<>();
+                            Object idVal = esTemplate.getESDataFromRS(mapping, rs, esFieldData);
+
+                            if (logger.isTraceEnabled()) {
+                                logger.trace(
+                                    "Single table insert ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
+                                    config.getDestination(),
+                                    dml.getTable(),
+                                    mapping.get_index(),
+                                    idVal);
+                            }
+                            boolean result = esTemplate.insert(config, idVal, esFieldData);
+                            if (!result) {
+                                logger.error(
+                                    "Single table insert to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
+                                    config.getDestination(),
+                                    dml.getTable(),
+                                    mapping.get_index(),
+                                    idVal);
+                            }
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                    return 0;
+                });
             }
         }
     }

+ 3 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java

@@ -208,7 +208,7 @@ public class ESSyncUtil {
      * @param data
      * @return
      */
-    public static String pkConditaionSql(ESMapping mapping, Map<String, Object> data) {
+    public static String pkConditionSql(ESMapping mapping, Map<String, Object> data) {
         Set<ColumnItem> idColumns = new LinkedHashSet<>();
         SchemaItem schemaItem = mapping.getSchemaItem();
 
@@ -252,7 +252,7 @@ public class ESSyncUtil {
     /**
      * 执行查询sql
      */
-    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) throws SQLException {
+    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun)  {
         Connection conn = null;
         Statement smt = null;
         ResultSet rs = null;
@@ -265,7 +265,7 @@ public class ESSyncUtil {
             return fun.apply(rs);
         } catch (SQLException e) {
             logger.error("sqlRs has error, sql: {} ", sql);
-            throw e;
+            throw new RuntimeException(e);
         } finally {
             if (rs != null) {
                 try {

+ 140 - 29
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.client.adapter.es.support;
 
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -31,6 +33,8 @@ import org.springframework.util.CollectionUtils;
 
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
 
 public class ESTemplate {
 
@@ -177,8 +181,8 @@ public class ESTemplate {
             }
         });
         String scriptLine = sb.toString();
-        if (logger.isDebugEnabled()) {
-            logger.debug(scriptLine);
+        if (logger.isTraceEnabled()) {
+            logger.trace(scriptLine);
         }
 
         UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(transportClient);
@@ -188,8 +192,8 @@ public class ESTemplate {
             .script(new Script(ScriptType.INLINE, "painless", scriptLine, Collections.emptyMap()));
 
         BulkByScrollResponse response = updateByQuery.get();
-        if (logger.isDebugEnabled()) {
-            logger.debug("updateByQuery response: {}", response.getStatus());
+        if (logger.isTraceEnabled()) {
+            logger.trace("updateByQuery response: {}", response.getStatus());
         }
         if (!CollectionUtils.isEmpty(response.getSearchFailures())) {
             logger.error("script update_for_search has search error: " + response.getBulkFailures());
@@ -273,21 +277,122 @@ public class ESTemplate {
         return true;
     }
 
-    public Object getDataValue(ESMapping config, Map<String, Object> data, String fieldName) {
-        String esType = getEsType(config, fieldName);
-        Object value = data.get(fieldName);
-        if (value instanceof Byte) {
-            if ("boolean".equals(esType)) {
-                value = ((Byte) value).intValue() != 0;
+    // public Object getRSDataValue(ESMapping mapping, ResultSet rs, String
+    // fieldName) throws SQLException {
+    // String esType = getEsType(mapping, fieldName);
+    // Object value = rs.getObject(fieldName);
+    // if (value instanceof Boolean) {
+    // // 判断es类型
+    // if (!"boolean".equals(esType)) {
+    // value = rs.getByte(fieldName);
+    // }
+    // }
+    // return value;
+    // }
+    //
+    // public Object getDataValue(ESMapping mapping, Map<String, Object> data,
+    // String fieldName) {
+    // String esType = getEsType(mapping, fieldName);
+    // Object value = data.get(fieldName);
+    // if (value instanceof Byte) {
+    // if ("boolean".equals(esType)) {
+    // value = ((Byte) value).intValue() != 0;
+    // }
+    // }
+    // return value;
+    // }
+
+    // public Object convertType(ESMapping mapping, String fieldName, Object val) {
+    // // 从mapping中获取类型来转换
+    // String esType = getEsType(mapping, fieldName);
+    // return ESSyncUtil.typeConvert(val, esType);
+    // }
+
+    public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,
+                                  Map<String, Object> esFieldData) throws SQLException {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            String fieldName = Util.cleanColumn(fieldItem.getColumnItems().iterator().next().getColumnName());
+            String esType = getEsType(mapping, fieldName);
+
+            Object value = resultSet.getObject(fieldName);
+            if (value instanceof Boolean) {
+                // 判断es类型
+                if (!"boolean".equals(esType)) {
+                    value = resultSet.getByte(fieldName);
+                }
+            }
+
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = ESSyncUtil.typeConvert(value, esType);
+            }
+
+            if (!fieldItem.getFieldName().equals(mapping.get_id()) && !mapping.getSkips().contains(fieldName)) {
+                esFieldData.put(fieldName, ESSyncUtil.typeConvert(value, esType));
             }
         }
-        return value;
+        return resultIdVal;
     }
 
-    public Object convertType(ESMapping config, String fieldName, Object val) {
-        // 从mapping中获取类型来转换
-        String esType = getEsType(config, fieldName);
-        return ESSyncUtil.typeConvert(val, esType);
+    // public Object getIdValFromDmlData(ESMapping mapping, Map<String, Object>
+    // dmlData) {
+    // SchemaItem schemaItem = mapping.getSchemaItem();
+    // String idFieldName = mapping.get_id() == null ? mapping.getPk() :
+    // mapping.get_id();
+    //
+    // for (SchemaItem.FieldItem fieldItem : schemaItem.getSelectFields().values())
+    // {
+    // if (fieldItem.getFieldName().equals(idFieldName)) {
+    // String fieldName =
+    // Util.cleanColumn(fieldItem.getColumnItems().iterator().next().getColumnName());
+    // String esType = getEsType(mapping, fieldName);
+    // Object value = dmlData.get(fieldName);
+    // if (value instanceof Byte) {
+    // if ("boolean".equals(esType)) {
+    // value = ((Byte) value).intValue() != 0;
+    // }
+    // }
+    //
+    // return ESSyncUtil.typeConvert(value, esType);
+    // }
+    // }
+    // return null;
+    // }
+
+    /**
+     * 将dml的data转换为es的data
+     *
+     * @param mapping 配置mapping
+     * @param dmlData dml data
+     * @param esFieldData es data
+     * @return 返回 id 值
+     */
+    public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData,
+                                       Map<String, Object> esFieldData) {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            String fieldName = Util.cleanColumn(fieldItem.getColumnItems().iterator().next().getColumnName());
+            String esType = getEsType(mapping, fieldName);
+            Object value = dmlData.get(fieldName);
+            if (value instanceof Byte) {
+                if ("boolean".equals(esType)) {
+                    value = ((Byte) value).intValue() != 0;
+                }
+            }
+
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = ESSyncUtil.typeConvert(value, esType);
+            }
+
+            if (!fieldItem.getFieldName().equals(mapping.get_id()) && !mapping.getSkips().contains(fieldName)) {
+                esFieldData.put(fieldName, ESSyncUtil.typeConvert(value, esType));
+            }
+        }
+        return resultIdVal;
     }
 
     /**
@@ -295,9 +400,16 @@ public class ESTemplate {
      */
     private static ConcurrentMap<String, Map<String, String>> esFieldTypes = new ConcurrentHashMap<>();
 
+    /**
+     * 获取es mapping中的属性类型
+     *
+     * @param mapping mapping配置
+     * @param fieldName 属性名
+     * @return 类型
+     */
     @SuppressWarnings("unchecked")
-    public String getEsType(ESMapping config, String fieldName) {
-        String key = config.get_index() + "-" + config.get_type();
+    private String getEsType(ESMapping mapping, String fieldName) {
+        String key = mapping.get_index() + "-" + mapping.get_type();
         Map<String, String> fieldType = esFieldTypes.get(key);
         if (fieldType == null) {
             ImmutableOpenMap<String, MappingMetaData> mappings;
@@ -310,29 +422,28 @@ public class ESTemplate {
                     .getState()
                     .getMetaData()
                     .getIndices()
-                    .get(config.get_index())
+                    .get(mapping.get_index())
                     .getMappings();
             } catch (NullPointerException e) {
-                throw new IllegalArgumentException("Not found the mapping info of index: " + config.get_index());
+                throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
             }
-            MappingMetaData mappingMetaData = mappings.get(config.get_type());
+            MappingMetaData mappingMetaData = mappings.get(mapping.get_type());
             if (mappingMetaData == null) {
-                throw new IllegalArgumentException("Not found the mapping info of index: " + config.get_index());
+                throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
             }
 
-            Map<String, String> fieldTypeTmp = new LinkedHashMap<>();
+            fieldType = new LinkedHashMap<>();
 
             Map<String, Object> sourceMap = mappingMetaData.getSourceAsMap();
-            Map<String, Object> mapping = (Map<String, Object>) sourceMap.get("properties");
-            mapping.forEach((k, v) -> {
-                Map<String, Object> value = (Map<String, Object>) v;
+            Map<String, Object> esMapping = (Map<String, Object>) sourceMap.get("properties");
+            for (Map.Entry<String, Object> entry : esMapping.entrySet()) {
+                Map<String, Object> value = (Map<String, Object>) entry.getValue();
                 if (value.containsKey("properties")) {
-                    fieldTypeTmp.put(k, "object");
+                    fieldType.put(entry.getKey(), "object");
                 } else {
-                    fieldTypeTmp.put(k, (String) value.get("type"));
+                    fieldType.put(entry.getKey(), (String) value.get("type"));
                 }
-            });
-            fieldType = fieldTypeTmp;
+            }
             esFieldTypes.put(key, fieldType);
         }
 

+ 2 - 1
client-adapter/elasticsearch/src/main/resources/es/mytest_user.yml

@@ -9,6 +9,7 @@ esMapping:
 #        left join role b on a.role_id=b.id
 #        left join (select user_id, group_concat(label,',') as labels from user_label
 #        group by user_id) c on c.user_id=a.id"
-  sql: "select id, name, c_time from user "
+#  sql: "select id, name, c_time from user "
+  sql: "select id, concat(name, '_') as name, c_time from user "
   commitBatch: 3000
   etlCondition: "where a.c_time>='{0}' or b.c_time>='{0}' or c.c_time>='{0}'"

+ 1 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ConfigLoadTest.java

@@ -18,7 +18,7 @@ public class ConfigLoadTest {
     public void before() {
         AdapterConfigs.put("es", "mytest_user.yml");
         // 加载数据源连接池
-        DatasourceConfig.DATA_SOURCES.put("defaultDS", DataSourceConstant.dataSource);
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
     }
 
     @Test

+ 0 - 54
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/SyncTest.java

@@ -1,54 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.es.test;
-
-import java.util.*;
-
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
-import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-
-public class SyncTest {
-
-    private ESAdapter esAdapter;
-
-    @Before
-    public void init() {
-        AdapterConfigs.put("es", "mytest_user.yml");
-        DatasourceConfig.DATA_SOURCES.put("defaultDS", DataSourceConstant.dataSource);
-
-        OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
-        outerAdapterConfig.setName("es");
-        outerAdapterConfig.setHosts("127.0.0.1:9300");
-        Map<String, String> properties = new HashMap<>();
-        properties.put("cluster.name", "elasticsearch");
-        outerAdapterConfig.setProperties(properties);
-
-        esAdapter = new ESAdapter();
-        esAdapter.init(outerAdapterConfig);
-
-    }
-
-    @Test
-    public void insertTest01() {
-        Dml dml = new Dml();
-        dml.setDestination("example");
-        dml.setTs(new Date().getTime());
-        dml.setType("INSERT");
-        dml.setDatabase("mytest");
-        dml.setTable("user");
-        List<Map<String, Object>> dataList = new ArrayList<>();
-        Map<String, Object> data = new LinkedHashMap<>();
-        dataList.add(data);
-        data.put("id", 1L);
-        data.put("name", "Eric");
-        data.put("c_time", new Date());
-
-        dml.setData(dataList);
-
-        esAdapter.sync(dml);
-    }
-}

+ 15 - 6
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/DataSourceConstant.java → client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/TestConstant.java

@@ -4,18 +4,26 @@ import java.sql.SQLException;
 
 import com.alibaba.druid.pool.DruidDataSource;
 
-public class DataSourceConstant {
+public class TestConstant {
+
+    public final static String    jdbcUrl      = "jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true";
+    public final static String    jdbcUser     = "root";
+    public final static String    jdbcPassword = "121212";
+
+    public final static String    esHosts      = "127.0.0.1:9300";
+    public final static String    clusterNmae  = "elasticsearch";
+
+    public static DruidDataSource dataSource;
 
-    static DruidDataSource dataSource;
     static {
         dataSource = new DruidDataSource();
         dataSource.setDriverClassName("com.mysql.jdbc.Driver");
-        dataSource.setUrl("jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true");
-        dataSource.setUsername("root");
-        dataSource.setPassword("121212");
+        dataSource.setUrl(jdbcUrl);
+        dataSource.setUsername(jdbcUser);
+        dataSource.setPassword(jdbcPassword);
         dataSource.setInitialSize(1);
         dataSource.setMinIdle(1);
-        dataSource.setMaxActive(3);
+        dataSource.setMaxActive(1);
         dataSource.setMaxWait(60000);
         dataSource.setTimeBetweenEvictionRunsMillis(60000);
         dataSource.setMinEvictableIdleTimeMillis(300000);
@@ -28,4 +36,5 @@ public class DataSourceConstant {
             e.printStackTrace();
         }
     }
+
 }

+ 97 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncTest.java

@@ -0,0 +1,97 @@
+package com.alibaba.otter.canal.client.adapter.es.test.sync;
+
+import java.util.*;
+
+import org.elasticsearch.action.get.GetResponse;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
+import com.alibaba.otter.canal.client.adapter.es.test.TestConstant;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+
+public class UserSyncTest {
+
+    private ESAdapter esAdapter;
+
+    @Before
+    public void init() {
+        AdapterConfigs.put("es", "mytest_user.yml");
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
+
+        OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
+        outerAdapterConfig.setName("es");
+        outerAdapterConfig.setHosts(TestConstant.esHosts);
+        Map<String, String> properties = new HashMap<>();
+        properties.put("cluster.name", TestConstant.clusterNmae);
+        outerAdapterConfig.setProperties(properties);
+
+        esAdapter = new ESAdapter();
+        esAdapter.init(outerAdapterConfig);
+
+    }
+
+    /**
+     * 单表插入
+     */
+    @Test
+    public void insertTest01() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric");
+        data.put("c_time", new Date());
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("Eric", response.getSource().get("name"));
+    }
+
+    /**
+     * 主表带函数插入, 数据库里内容必须和单测一致
+     */
+    @Test
+    public void insertTest02() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric");
+        data.put("c_time", new Date());
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("Eric_", response.getSource().get("name"));
+    }
+
+    @After
+    public void after() {
+        esAdapter.destroy();
+        DatasourceConfig.DATA_SOURCES.values().forEach(DruidDataSource::close);
+    }
+}

+ 13 - 0
client-adapter/elasticsearch/src/test/resources/logback-test.xml

@@ -0,0 +1,13 @@
+<configuration scan="true" scanPeriod=" 5 seconds">
+	<jmxConfigurator />
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+
+	<root level="TRACE">
+		<appender-ref ref="STDOUT"/>
+	</root>
+</configuration>

+ 2 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -150,6 +150,8 @@ public abstract class AbstractCanalAdapterWorker {
             } catch (Throwable e) {
                 if (i == retry - 1) {
                     connector.ack();
+                } else {
+                    connector.rollback();
                 }
 
                 logger.error(e.getMessage(), e);