Browse Source

从表插入, 主表为简单字段

mcy 6 years ago
parent
commit
b2b44555ca
13 changed files with 447 additions and 264 deletions
  1. 13 49
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java
  2. 114 11
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  3. 2 2
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java
  4. 15 69
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java
  5. 27 0
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/Common.java
  6. 57 0
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java
  7. 57 0
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinSubTest.java
  8. 59 0
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java
  9. 75 0
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java
  10. 0 133
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncTest.java
  11. 10 0
      client-adapter/elasticsearch/src/test/resources/es/mytest_user_join_one.yml
  12. 10 0
      client-adapter/elasticsearch/src/test/resources/es/mytest_user_join_sub.yml
  13. 8 0
      client-adapter/elasticsearch/src/test/resources/es/mytest_user_single.yml

+ 13 - 49
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java

@@ -23,7 +23,6 @@ public class SchemaItem {
         this.isAllFieldsSimple();
         aliasTableItems.values().forEach(tableItem -> {
             tableItem.getRelationTableFields();
-            // tableItem.getRelationKeyFieldItems();
             tableItem.getRelationSelectFieldItems();
         });
     }
@@ -76,19 +75,19 @@ public class SchemaItem {
                     getSelectFields()
                         .forEach((fieldName, fieldItem) -> fieldItem.getColumnItems().forEach(columnItem -> {
                             TableItem tableItem = getAliasTableItems().get(columnItem.getOwner());
-                            if (!tableItem.isSubQuery()) {
-                                List<FieldItem> fieldItems = columnFields.computeIfAbsent(
-                                    columnItem.getOwner() + "." + columnItem.getColumnName(),
-                                    k -> new ArrayList<>());
-                                fieldItems.add(fieldItem);
-                            } else {
-                                tableItem.getSubQueryFields().forEach(subQueryField -> {
-                                    List<FieldItem> fieldItems = columnFields.computeIfAbsent(
-                                        columnItem.getOwner() + "." + subQueryField.getColumn().getColumnName(),
-                                        k -> new ArrayList<>());
-                                    fieldItems.add(fieldItem);
-                                });
-                            }
+                            // if (!tableItem.isSubQuery()) {
+                            List<FieldItem> fieldItems = columnFields.computeIfAbsent(
+                                columnItem.getOwner() + "." + columnItem.getColumnName(),
+                                k -> new ArrayList<>());
+                            fieldItems.add(fieldItem);
+                            // } else {
+                            // tableItem.getSubQueryFields().forEach(subQueryField -> {
+                            // List<FieldItem> fieldItems = columnFields.computeIfAbsent(
+                            // columnItem.getOwner() + "." + subQueryField.getColumn().getColumnName(),
+                            // k -> new ArrayList<>());
+                            // fieldItems.add(fieldItem);
+                            // });
+                            // }
                         }));
                 }
             }
@@ -146,8 +145,6 @@ public class SchemaItem {
         private boolean                                  subQuery;
 
         private volatile Map<FieldItem, List<FieldItem>> relationTableFields;               // 当前表关联条件字段对应主表查询字段
-        // private volatile List<FieldItem> relationKeyFieldItems; //
-        // 关联条件字段在select中的对应字段
         private volatile List<FieldItem>                 relationSelectFieldItems;          // 子表所在主表的查询字段
 
         public TableItem(SchemaItem schemaItem){
@@ -237,10 +234,8 @@ public class SchemaItem {
                             FieldItem rightFieldItem = relationFieldsPair.getRightFieldItem();
                             FieldItem currentTableRelField = null;
                             if (getAlias().equals(leftFieldItem.getOwner())) {
-                                // relationTableFields.add(leftFieldItem);
                                 currentTableRelField = leftFieldItem;
                             } else if (getAlias().equals(rightFieldItem.getOwner())) {
-                                // relationTableFields.add(rightFieldItem);
                                 currentTableRelField = rightFieldItem;
                             }
 
@@ -268,37 +263,6 @@ public class SchemaItem {
             return relationTableFields;
         }
 
-        // public List<FieldItem> getRelationKeyFieldItems() {
-        // if (relationKeyFieldItems == null) {
-        // synchronized (SchemaItem.class) {
-        // if (relationKeyFieldItems == null) {
-        // relationKeyFieldItems = new ArrayList<>();
-        // getRelationFields().forEach(relationFieldsPair -> {
-        // FieldItem leftFieldItem = relationFieldsPair.getLeftFieldItem();
-        // List<FieldItem> selectFieldItem = getSchemaItem().getColumnFields()
-        // .get(leftFieldItem.getOwner() + "." +
-        // leftFieldItem.getColumn().getColumnName());
-        // if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
-        // relationKeyFieldItems.addAll(selectFieldItem);
-        // } else {
-        // FieldItem rightFieldItem = relationFieldsPair.getRightFieldItem();
-        // selectFieldItem = getSchemaItem().getColumnFields()
-        // .get(rightFieldItem.getOwner() + "." +
-        // rightFieldItem.getColumn().getColumnName());
-        // if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
-        // relationKeyFieldItems.addAll(selectFieldItem);
-        // } else {
-        // throw new UnsupportedOperationException(
-        // "Relation condition column must in select columns.");
-        // }
-        // }
-        // });
-        // }
-        // }
-        // }
-        // return relationKeyFieldItems;
-        // }
-
         public List<FieldItem> getRelationSelectFieldItems() {
             if (relationSelectFieldItems == null) {
                 synchronized (SchemaItem.class) {

+ 114 - 11
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -133,12 +133,20 @@ public class ESSyncService {
                 continue; // 单表插入完成
             }
 
-            // ------是主表 查询sql来插入------
+            // ------是主表 查询sql来插入------
             if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
                 String sql = mapping.getSql();
                 String condition = ESSyncUtil.pkConditionSql(mapping, data);
                 sql = ESSyncUtil.appendCondition(sql, condition);
                 DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+                if (logger.isTraceEnabled()) {
+                    logger.trace(
+                        "Single table insert ot es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
+                        config.getDestination(),
+                        dml.getTable(),
+                        mapping.get_index(),
+                        sql.replace("\n", " "));
+                }
                 ESSyncUtil.sqlRS(ds, sql, rs -> {
                     try {
                         while (rs.next()) {
@@ -192,23 +200,30 @@ public class ESSyncService {
                     if (!tableItem.isSubQuery()) {
                         Map<String, Object> esFieldData = new LinkedHashMap<>();
                         for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
-                            Object value = esTemplate
-                                .getValFromData(mapping, data, fieldItem.getColumn().getColumnName());
+                            Object value = esTemplate.getValFromData(mapping,
+                                data,
+                                fieldItem.getFieldName(),
+                                fieldItem.getColumn().getColumnName());
                             esFieldData.put(fieldItem.getFieldName(), value);
                         }
 
                         BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
                         for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields()
                             .entrySet()) {
-                            Object value = esTemplate
-                                .getValFromData(mapping, data, entry.getKey().getColumn().getColumnName());
                             for (FieldItem fieldItem : entry.getValue()) {
-                                String fieldName = fieldItem.getFieldName();
-                                // 判断是否是主键
-                                if (fieldName.equals(mapping.get_id())) {
-                                    fieldName = "_id";
+                                if (fieldItem.getColumnItems().size() == 1) {
+                                    Object value = esTemplate.getValFromData(mapping,
+                                            data,
+                                            fieldItem.getFieldName(),
+                                            entry.getKey().getColumn().getColumnName());
+
+                                    String fieldName = fieldItem.getFieldName();
+                                    // 判断是否是主键
+                                    if (fieldName.equals(mapping.get_id())) {
+                                        fieldName = "_id";
+                                    }
+                                    queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
                                 }
-                                queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
                             }
                         }
 
@@ -228,7 +243,95 @@ public class ESSyncService {
                                 mapping.get_index());
                         }
                     } else {
-                        // TODO
+                        StringBuilder sql = new StringBuilder(
+                            "SELECT * FROM (" + tableItem.getSubQuerySql() + ") " + tableItem.getAlias() + " WHERE ");
+
+                        for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
+                            String columnName = fkFieldItem.getColumn().getColumnName();
+                            Object value = esTemplate
+                                .getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
+                            if (value instanceof String) {
+                                sql.append(tableItem.getAlias())
+                                    .append(".")
+                                    .append(columnName)
+                                    .append("='")
+                                    .append(value)
+                                    .append("' ")
+                                    .append(" AND ");
+                            } else {
+                                sql.append(tableItem.getAlias())
+                                    .append(".")
+                                    .append(columnName)
+                                    .append("=")
+                                    .append(value)
+                                    .append(" ")
+                                    .append(" AND ");
+                            }
+                        }
+                        int len = sql.length();
+                        sql.delete(len - 5, len);
+                        DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+                        if (logger.isTraceEnabled()) {
+                            logger.trace(
+                                "Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
+                                config.getDestination(),
+                                dml.getTable(),
+                                mapping.get_index(),
+                                sql.toString().replace("\n", " "));
+                        }
+                        ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
+                            try {
+                                while (rs.next()) {
+                                    Map<String, Object> esFieldData = new LinkedHashMap<>();
+                                    for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                                        Object val = esTemplate.getValFromRS(mapping,
+                                            rs,
+                                            fieldItem.getFieldName(),
+                                            fieldItem.getColumn().getColumnName());
+                                        esFieldData.put(fieldItem.getFieldName(), val);
+                                    }
+
+                                    BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
+                                    for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem
+                                        .getRelationTableFields()
+                                        .entrySet()) {
+                                        for (FieldItem fieldItem : entry.getValue()) {
+                                            if (fieldItem.getColumnItems().size() == 1) {
+                                                Object value = esTemplate.getValFromRS(mapping,
+                                                    rs,
+                                                    fieldItem.getFieldName(),
+                                                    entry.getKey().getColumn().getColumnName());
+                                                String fieldName = fieldItem.getFieldName();
+                                                // 判断是否是主键
+                                                if (fieldName.equals(mapping.get_id())) {
+                                                    fieldName = "_id";
+                                                }
+                                                queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
+                                            }
+                                        }
+                                    }
+
+                                    if (logger.isDebugEnabled()) {
+                                        logger.trace(
+                                            "Join table update es index by foreign key, destination:{}, table: {}, index: {}",
+                                            config.getDestination(),
+                                            dml.getTable(),
+                                            mapping.get_index());
+                                    }
+                                    boolean result = esTemplate.updateByQuery(mapping, queryBuilder, esFieldData);
+                                    if (!result) {
+                                        logger.error(
+                                            "Join table update es index by foreign key error, destination:{}, table: {}, index: {}",
+                                            config.getDestination(),
+                                            dml.getTable(),
+                                            mapping.get_index());
+                                    }
+                                }
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                            return 0;
+                        });
                     }
                 } else {
                     // TODO 查询总sql

+ 2 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java

@@ -2,7 +2,6 @@ package com.alibaba.otter.canal.client.adapter.es.support;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
 import java.sql.*;
 import java.util.*;
@@ -20,6 +19,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
 
 public class ESSyncUtil {
@@ -249,7 +249,7 @@ public class ESSyncUtil {
     /**
      * 执行查询sql
      */
-    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun)  {
+    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) {
         Connection conn = null;
         Statement smt = null;
         ResultSet rs = null;

+ 15 - 69
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -274,44 +274,14 @@ public class ESTemplate {
         return true;
     }
 
-    // public Object getRSDataValue(ESMapping mapping, ResultSet rs, String
-    // fieldName) throws SQLException {
-    // String esType = getEsType(mapping, fieldName);
-    // Object value = rs.getObject(fieldName);
-    // if (value instanceof Boolean) {
-    // // 判断es类型
-    // if (!"boolean".equals(esType)) {
-    // value = rs.getByte(fieldName);
-    // }
-    // }
-    // return value;
-    // }
-    //
-    // public Object getDataValue(ESMapping mapping, Map<String, Object> data,
-    // String fieldName) {
-    // String esType = getEsType(mapping, fieldName);
-    // Object value = data.get(fieldName);
-    // if (value instanceof Byte) {
-    // if ("boolean".equals(esType)) {
-    // value = ((Byte) value).intValue() != 0;
-    // }
-    // }
-    // return value;
-    // }
-
-    // public Object convertType(ESMapping mapping, String fieldName, Object val) {
-    // // 从mapping中获取类型来转换
-    // String esType = getEsType(mapping, fieldName);
-    // return ESSyncUtil.typeConvert(val, esType);
-    // }
-
-    public Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName) throws SQLException {
+    public Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName,
+                               String columnName) throws SQLException {
         String esType = getEsType(mapping, fieldName);
 
-        Object value = resultSet.getObject(fieldName);
+        Object value = resultSet.getObject(columnName);
         if (value instanceof Boolean) {
             if (!"boolean".equals(esType)) {
-                value = resultSet.getByte(fieldName);
+                value = resultSet.getByte(columnName);
             }
         }
         return ESSyncUtil.typeConvert(value, esType);
@@ -323,48 +293,23 @@ public class ESTemplate {
         String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
         Object resultIdVal = null;
         for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
-            String fieldName = Util.cleanColumn(fieldItem.getColumnItems().iterator().next().getColumnName());
-            Object value = getValFromRS(mapping, resultSet, fieldName);
+            Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
 
             if (fieldItem.getFieldName().equals(idFieldName)) {
                 resultIdVal = value;
             }
 
-            if (!fieldItem.getFieldName().equals(mapping.get_id()) && !mapping.getSkips().contains(fieldName)) {
-                esFieldData.put(fieldName, value);
+            if (!fieldItem.getFieldName().equals(mapping.get_id())
+                && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                esFieldData.put(fieldItem.getFieldName(), value);
             }
         }
         return resultIdVal;
     }
 
-    // public Object getIdValFromDmlData(ESMapping mapping, Map<String, Object>
-    // dmlData) {
-    // SchemaItem schemaItem = mapping.getSchemaItem();
-    // String idFieldName = mapping.get_id() == null ? mapping.getPk() :
-    // mapping.get_id();
-    //
-    // for (SchemaItem.FieldItem fieldItem : schemaItem.getSelectFields().values())
-    // {
-    // if (fieldItem.getFieldName().equals(idFieldName)) {
-    // String fieldName =
-    // Util.cleanColumn(fieldItem.getColumnItems().iterator().next().getColumnName());
-    // String esType = getEsType(mapping, fieldName);
-    // Object value = dmlData.get(fieldName);
-    // if (value instanceof Byte) {
-    // if ("boolean".equals(esType)) {
-    // value = ((Byte) value).intValue() != 0;
-    // }
-    // }
-    //
-    // return ESSyncUtil.typeConvert(value, esType);
-    // }
-    // }
-    // return null;
-    // }
-
-    public Object getValFromData(ESMapping mapping, Map<String, Object> dmlData, String fieldName) {
+    public Object getValFromData(ESMapping mapping, Map<String, Object> dmlData, String fieldName, String columnName) {
         String esType = getEsType(mapping, fieldName);
-        Object value = dmlData.get(fieldName);
+        Object value = dmlData.get(columnName);
         if (value instanceof Byte) {
             if ("boolean".equals(esType)) {
                 value = ((Byte) value).intValue() != 0;
@@ -387,15 +332,16 @@ public class ESTemplate {
         String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
         Object resultIdVal = null;
         for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
-            String fieldName = Util.cleanColumn(fieldItem.getColumnItems().iterator().next().getColumnName());
-            Object value = getValFromData(mapping, dmlData, fieldName);
+            String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
+            Object value = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
 
             if (fieldItem.getFieldName().equals(idFieldName)) {
                 resultIdVal = value;
             }
 
-            if (!fieldItem.getFieldName().equals(mapping.get_id()) && !mapping.getSkips().contains(fieldName)) {
-                esFieldData.put(fieldName, value);
+            if (!fieldItem.getFieldName().equals(mapping.get_id())
+                && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                esFieldData.put(fieldItem.getFieldName(), value);
             }
         }
         return resultIdVal;

+ 27 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/Common.java

@@ -0,0 +1,27 @@
+package com.alibaba.otter.canal.client.adapter.es.test.sync;
+
+import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
+import com.alibaba.otter.canal.client.adapter.es.test.TestConstant;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Common {
+
+    public static ESAdapter init() {
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
+
+        OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
+        outerAdapterConfig.setName("es");
+        outerAdapterConfig.setHosts(TestConstant.esHosts);
+        Map<String, String> properties = new HashMap<>();
+        properties.put("cluster.name", TestConstant.clusterNmae);
+        outerAdapterConfig.setProperties(properties);
+
+        ESAdapter esAdapter = new ESAdapter();
+        esAdapter.init(outerAdapterConfig);
+        return esAdapter;
+    }
+}

+ 57 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java

@@ -0,0 +1,57 @@
+package com.alibaba.otter.canal.client.adapter.es.test.sync;
+
+import java.util.*;
+
+import org.elasticsearch.action.get.GetResponse;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+public class RoleSyncJoinOneTest {
+
+    private ESAdapter esAdapter;
+
+    @Before
+    public void init() {
+        AdapterConfigs.put("es", "mytest_user_join_one.yml");
+        esAdapter = Common.init();
+    }
+
+    /**
+     * 非子查询从表插入 (确保主表记录必须有数据)
+     */
+    @Test
+    public void insertTest01() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("role");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("role_name", "admin");
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("admin", response.getSource().get("_role_name"));
+    }
+
+    @After
+    public void after() {
+        esAdapter.destroy();
+        DatasourceConfig.DATA_SOURCES.values().forEach(DruidDataSource::close);
+    }
+}

+ 57 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinSubTest.java

@@ -0,0 +1,57 @@
+package com.alibaba.otter.canal.client.adapter.es.test.sync;
+
+import java.util.*;
+
+import org.elasticsearch.action.get.GetResponse;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+public class RoleSyncJoinSubTest {
+
+    private ESAdapter esAdapter;
+
+    @Before
+    public void init() {
+        AdapterConfigs.put("es", "mytest_user_join_sub.yml");
+        esAdapter = Common.init();
+    }
+
+    /**
+     * 非子查询从表插入 (确保主表记录必须有数据)
+     */
+    @Test
+    public void insertTest01() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("role");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("role_name", "admin2");
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("admin2", response.getSource().get("_role_name"));
+    }
+
+    @After
+    public void after() {
+        esAdapter.destroy();
+        DatasourceConfig.DATA_SOURCES.values().forEach(DruidDataSource::close);
+    }
+}

+ 59 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java

@@ -0,0 +1,59 @@
+package com.alibaba.otter.canal.client.adapter.es.test.sync;
+
+import java.util.*;
+
+import org.elasticsearch.action.get.GetResponse;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+public class UserSyncJoinOneTest {
+
+    private ESAdapter esAdapter;
+
+    @Before
+    public void init() {
+        AdapterConfigs.put("es", "mytest_user_join_one.yml");
+        esAdapter = Common.init();
+    }
+
+    /**
+     * 主表带函数插入, 数据库里内容必须和单测一致
+     */
+    @Test
+    public void insertTest01() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric");
+        data.put("role_id", 1L);
+        data.put("c_time", new Date());
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("Eric_", response.getSource().get("_name"));
+    }
+
+    @After
+    public void after() {
+        esAdapter.destroy();
+        DatasourceConfig.DATA_SOURCES.values().forEach(DruidDataSource::close);
+    }
+}

+ 75 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java

@@ -0,0 +1,75 @@
+package com.alibaba.otter.canal.client.adapter.es.test.sync;
+
+import java.util.*;
+
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+public class UserSyncSingleTest {
+
+    private ESAdapter esAdapter;
+
+    @Before
+    public void init() {
+        AdapterConfigs.put("es", "mytest_user_single.yml");
+        esAdapter = Common.init();
+    }
+
+    /**
+     * 单表插入
+     */
+    @Test
+    public void insertTest01() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric");
+        data.put("role_id", 1L);
+        data.put("c_time", new Date());
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("Eric", response.getSource().get("_name"));
+    }
+
+    @Test
+    public void ttt() {
+        SearchResponse response = esAdapter.getTransportClient()
+            .prepareSearch("mytest_user")
+            .setTypes("_doc")
+            .setQuery(QueryBuilders.termQuery("_id", 1L))
+            .setSize(100)
+            .get();
+        for (SearchHit hit : response.getHits()) {
+            System.out.println(hit.getSourceAsMap().get("name"));
+        }
+    }
+
+    @After
+    public void after() {
+        esAdapter.destroy();
+        DatasourceConfig.DATA_SOURCES.values().forEach(DruidDataSource::close);
+    }
+}

+ 0 - 133
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncTest.java

@@ -1,133 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.es.test.sync;
-
-import java.util.*;
-
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.alibaba.druid.pool.DruidDataSource;
-import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
-import com.alibaba.otter.canal.client.adapter.es.test.TestConstant;
-import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-
-public class UserSyncTest {
-
-    private ESAdapter esAdapter;
-
-    @Before
-    public void init() {
-        AdapterConfigs.put("es", "mytest_user.yml");
-        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
-
-        OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
-        outerAdapterConfig.setName("es");
-        outerAdapterConfig.setHosts(TestConstant.esHosts);
-        Map<String, String> properties = new HashMap<>();
-        properties.put("cluster.name", TestConstant.clusterNmae);
-        outerAdapterConfig.setProperties(properties);
-
-        esAdapter = new ESAdapter();
-        esAdapter.init(outerAdapterConfig);
-
-    }
-
-    /**
-     * 单表插入
-     */
-    @Test
-    public void insertTest01() {
-        Dml dml = new Dml();
-        dml.setDestination("example");
-        dml.setTs(new Date().getTime());
-        dml.setType("INSERT");
-        dml.setDatabase("mytest");
-        dml.setTable("user");
-        List<Map<String, Object>> dataList = new ArrayList<>();
-        Map<String, Object> data = new LinkedHashMap<>();
-        dataList.add(data);
-        data.put("id", 1L);
-        data.put("name", "Eric");
-        data.put("role_id", 1L);
-        data.put("c_time", new Date());
-
-        dml.setData(dataList);
-
-        esAdapter.getEsSyncService().sync(dml);
-
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
-        Assert.assertEquals("Eric", response.getSource().get("name"));
-    }
-
-    /**
-     * 主表带函数插入, 数据库里内容必须和单测一致
-     */
-    @Test
-    public void insertTest02() {
-        Dml dml = new Dml();
-        dml.setDestination("example");
-        dml.setTs(new Date().getTime());
-        dml.setType("INSERT");
-        dml.setDatabase("mytest");
-        dml.setTable("user");
-        List<Map<String, Object>> dataList = new ArrayList<>();
-        Map<String, Object> data = new LinkedHashMap<>();
-        dataList.add(data);
-        data.put("id", 1L);
-        data.put("name", "Eric");
-        data.put("role_id", 1L);
-        data.put("c_time", new Date());
-
-        dml.setData(dataList);
-
-        esAdapter.getEsSyncService().sync(dml);
-
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
-        Assert.assertEquals("Eric_", response.getSource().get("name"));
-    }
-
-    @Test
-    public void insertTest03() {
-        Dml dml = new Dml();
-        dml.setDestination("example");
-        dml.setTs(new Date().getTime());
-        dml.setType("INSERT");
-        dml.setDatabase("mytest");
-        dml.setTable("role");
-        List<Map<String, Object>> dataList = new ArrayList<>();
-        Map<String, Object> data = new LinkedHashMap<>();
-        dataList.add(data);
-        data.put("id", 1L);
-        data.put("role_name", "admin");
-
-        dml.setData(dataList);
-
-        esAdapter.getEsSyncService().sync(dml);
-    }
-
-    @Test
-    public void ttt(){
-        SearchResponse response = esAdapter.getTransportClient().prepareSearch("mytest_user")
-                .setTypes("_doc")
-                .setQuery(QueryBuilders.termQuery("_id", 1L))
-                .setSize(100)
-                .get();
-        for (SearchHit hit : response.getHits()) {
-            System.out.println(hit.getSourceAsMap().get("name"));
-        }
-    }
-
-    @After
-    public void after() {
-        esAdapter.destroy();
-        DatasourceConfig.DATA_SOURCES.values().forEach(DruidDataSource::close);
-    }
-}

+ 10 - 0
client-adapter/elasticsearch/src/test/resources/es/mytest_user_join_one.yml

@@ -0,0 +1,10 @@
+dataSourceKey: defaultDS
+destination: example
+esMapping:
+  _index: mytest_user
+  _type: _doc
+  _id: _id
+  sql: "select a.id as _id, concat(a.name,'_') as _name, a.role_id as _role_id,
+        b.role_name as _role_name, a.c_time as _c_time from user a
+        left join role b on b.id=a.role_id"
+  commitBatch: 3000

+ 10 - 0
client-adapter/elasticsearch/src/test/resources/es/mytest_user_join_sub.yml

@@ -0,0 +1,10 @@
+dataSourceKey: defaultDS
+destination: example
+esMapping:
+  _index: mytest_user
+  _type: _doc
+  _id: _id
+  sql: "select a.id as _id, concat(a.name,'_') as _name, a.role_id as _role_id,
+        b.role_name as _role_name, a.c_time as _c_time from user a
+        left join (select id, role_name from role ) b on b.id=a.role_id"
+  commitBatch: 3000

+ 8 - 0
client-adapter/elasticsearch/src/test/resources/es/mytest_user_single.yml

@@ -0,0 +1,8 @@
+dataSourceKey: defaultDS
+destination: example
+esMapping:
+  _index: mytest_user
+  _type: _doc
+  _id: _id
+  sql: "select a.id as _id, a.names as _name, a.role_id as _role_id, a.c_time as _c_time from user a"
+  commitBatch: 3000