Browse Source

新增完成

mcy 6 years ago
parent
commit
9d04a82c42

+ 346 - 207
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -98,12 +98,17 @@ public class ESSyncService {
         }
     }
 
+    /**
+     * 插入dml操作
+     * 
+     * @param config es配置
+     * @param dml dml数据
+     */
     private void insert(ESSyncConfig config, Dml dml) {
         List<Map<String, Object>> dataList = dml.getData();
         if (dataList == null || dataList.isEmpty()) {
             return;
         }
-        ESMapping mapping = config.getEsMapping();
         SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
         for (Map<String, Object> data : dataList) {
             if (data == null || data.isEmpty()) {
@@ -112,231 +117,365 @@ public class ESSyncService {
 
             // ------是否单表 & 所有字段都为简单字段------
             if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
-                Map<String, Object> esFieldData = new LinkedHashMap<>();
-                Object idVal = esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
+                singleTableSimpleFiledInsert(config, dml, data);
+            } else {
+                // ------是主表 查询sql来插入------
+                if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
+                    mainTableInsert(config, dml, data);
+                }
 
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Single table insert ot es index, destination:{}, table: {}, index: {}, id: {}",
-                        config.getDestination(),
-                        dml.getTable(),
-                        mapping.get_index(),
-                        idVal);
+                // 从表的操作
+                for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
+                    if (tableItem.isMain()) {
+                        continue;
+                    }
+                    if (!tableItem.getTableName().equals(dml.getTable())) {
+                        continue;
+                    }
+                    // 关联条件出现在主表查询条件是否为简单字段
+                    boolean allFieldsSimple = true;
+                    for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                        if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
+                            allFieldsSimple = false;
+                            break;
+                        }
+                    }
+                    // 所有查询字段均为简单字段
+                    if (allFieldsSimple) {
+                        // 不是子查询
+                        if (!tableItem.isSubQuery()) {
+                            // ------关联表简单字段插入------
+                            joinTableSimpleFieldOperation(config, dml, data, tableItem);
+                        } else {
+                            // ------关联子表简单字段插入------
+                            subTableSimpleFieldOperation(config, dml, data, tableItem);
+                        }
+                    } else {
+                        // ------关联子表复杂字段插入 执行全sql更新es------
+                        jonTableWholeSqlOperation(config, dml, data, tableItem);
+                    }
                 }
-                boolean result = esTemplate.insert(config, idVal, esFieldData);
-                if (!result) {
-                    logger.error("Single table insert to es index error, destination:{}, table: {}, index: {}, id: {}",
-                        config.getDestination(),
-                        dml.getTable(),
-                        mapping.get_index(),
-                        idVal);
+            }
+        }
+    }
+
+    /**
+     * 单表简单字段insert
+     *
+     * @param config es配置
+     * @param dml dml信息
+     * @param data 单行dml数据
+     */
+    private void singleTableSimpleFiledInsert(ESSyncConfig config, Dml dml, Map<String, Object> data) {
+        ESMapping mapping = config.getEsMapping();
+        Map<String, Object> esFieldData = new LinkedHashMap<>();
+        Object idVal = esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
+
+        if (logger.isTraceEnabled()) {
+            logger.trace("Single table insert ot es index, destination:{}, table: {}, index: {}, id: {}",
+                config.getDestination(),
+                dml.getTable(),
+                mapping.get_index(),
+                idVal);
+        }
+        boolean result = esTemplate.insert(config, idVal, esFieldData);
+        if (!result) {
+            logger.error("Single table insert to es index error, destination:{}, table: {}, index: {}, id: {}",
+                config.getDestination(),
+                dml.getTable(),
+                mapping.get_index(),
+                idVal);
+        }
+    }
+
+    /**
+     * 主表(单表)复杂字段insert
+     * 
+     * @param config es配置
+     * @param dml dml信息
+     * @param data 单行dml数据
+     */
+    private void mainTableInsert(ESSyncConfig config, Dml dml, Map<String, Object> data) {
+        ESMapping mapping = config.getEsMapping();
+        String sql = mapping.getSql();
+        String condition = ESSyncUtil.pkConditionSql(mapping, data);
+        sql = ESSyncUtil.appendCondition(sql, condition);
+        DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Single table insert ot es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
+                config.getDestination(),
+                dml.getTable(),
+                mapping.get_index(),
+                sql.replace("\n", " "));
+        }
+        ESSyncUtil.sqlRS(ds, sql, rs -> {
+            try {
+                while (rs.next()) {
+                    Map<String, Object> esFieldData = new LinkedHashMap<>();
+                    Object idVal = esTemplate.getESDataFromRS(mapping, rs, esFieldData);
+
+                    if (logger.isTraceEnabled()) {
+                        logger.trace(
+                            "Single table insert ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index(),
+                            idVal);
+                    }
+                    boolean result = esTemplate.insert(config, idVal, esFieldData);
+                    if (!result) {
+                        logger.error(
+                            "Single table insert to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index(),
+                            idVal);
+                    }
                 }
-                continue; // 单表插入完成
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
+            return 0;
+        });
+    }
 
-            // ------是主表 查询sql来插入------
-            if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
-                String sql = mapping.getSql();
-                String condition = ESSyncUtil.pkConditionSql(mapping, data);
-                sql = ESSyncUtil.appendCondition(sql, condition);
-                DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
-                if (logger.isTraceEnabled()) {
-                    logger.trace(
-                        "Single table insert ot es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
-                        config.getDestination(),
-                        dml.getTable(),
-                        mapping.get_index(),
-                        sql.replace("\n", " "));
+    /**
+     * 关联表主表简单字段operation
+     *
+     * @param config es配置
+     * @param dml dml信息
+     * @param data 单行dml数据
+     * @param tableItem 当前表配置
+     */
+    private void joinTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
+                                               TableItem tableItem) {
+        ESMapping mapping = config.getEsMapping();
+        Map<String, Object> esFieldData = new LinkedHashMap<>();
+        for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+            Object value = esTemplate
+                .getValFromData(mapping, data, fieldItem.getFieldName(), fieldItem.getColumn().getColumnName());
+            esFieldData.put(fieldItem.getFieldName(), value);
+        }
+
+        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
+        for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
+            for (FieldItem fieldItem : entry.getValue()) {
+                if (fieldItem.getColumnItems().size() == 1) {
+                    Object value = esTemplate.getValFromData(mapping,
+                        data,
+                        fieldItem.getFieldName(),
+                        entry.getKey().getColumn().getColumnName());
+
+                    String fieldName = fieldItem.getFieldName();
+                    // 判断是否是主键
+                    if (fieldName.equals(mapping.get_id())) {
+                        fieldName = "_id";
+                    }
+                    queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
                 }
-                ESSyncUtil.sqlRS(ds, sql, rs -> {
-                    try {
-                        while (rs.next()) {
-                            Map<String, Object> esFieldData = new LinkedHashMap<>();
-                            Object idVal = esTemplate.getESDataFromRS(mapping, rs, esFieldData);
-
-                            if (logger.isTraceEnabled()) {
-                                logger.trace(
-                                    "Single table insert ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
-                                    config.getDestination(),
-                                    dml.getTable(),
-                                    mapping.get_index(),
-                                    idVal);
-                            }
-                            boolean result = esTemplate.insert(config, idVal, esFieldData);
-                            if (!result) {
-                                logger.error(
-                                    "Single table insert to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
-                                    config.getDestination(),
-                                    dml.getTable(),
-                                    mapping.get_index(),
-                                    idVal);
+            }
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.trace("Join table update es index by foreign key, destination:{}, table: {}, index: {}",
+                config.getDestination(),
+                dml.getTable(),
+                mapping.get_index());
+        }
+        boolean result = esTemplate.updateByQuery(mapping, queryBuilder, esFieldData);
+        if (!result) {
+            logger.error("Join table update es index by foreign key error, destination:{}, table: {}, index: {}",
+                config.getDestination(),
+                dml.getTable(),
+                mapping.get_index());
+        }
+    }
+
+    /**
+     * 关联子查询, 主表简单字段operation
+     *
+     * @param config es配置
+     * @param dml dml信息
+     * @param data 单行dml数据
+     * @param tableItem 当前表配置
+     */
+    private void subTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
+                                              TableItem tableItem) {
+        ESMapping mapping = config.getEsMapping();
+        StringBuilder sql = new StringBuilder(
+            "SELECT * FROM (" + tableItem.getSubQuerySql() + ") " + tableItem.getAlias() + " WHERE ");
+
+        for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
+            String columnName = fkFieldItem.getColumn().getColumnName();
+            Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
+            if (value instanceof String) {
+                sql.append(tableItem.getAlias())
+                    .append(".")
+                    .append(columnName)
+                    .append("='")
+                    .append(value)
+                    .append("' ")
+                    .append(" AND ");
+            } else {
+                sql.append(tableItem.getAlias())
+                    .append(".")
+                    .append(columnName)
+                    .append("=")
+                    .append(value)
+                    .append(" ")
+                    .append(" AND ");
+            }
+        }
+        int len = sql.length();
+        sql.delete(len - 5, len);
+        DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
+                config.getDestination(),
+                dml.getTable(),
+                mapping.get_index(),
+                sql.toString().replace("\n", " "));
+        }
+        ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
+            try {
+                while (rs.next()) {
+                    Map<String, Object> esFieldData = new LinkedHashMap<>();
+                    for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                        Object val = esTemplate
+                            .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getColumn().getColumnName());
+                        esFieldData.put(fieldItem.getFieldName(), val);
+                    }
+
+                    BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
+                    for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
+                        for (FieldItem fieldItem : entry.getValue()) {
+                            if (fieldItem.getColumnItems().size() == 1) {
+                                Object value = esTemplate.getValFromRS(mapping,
+                                    rs,
+                                    fieldItem.getFieldName(),
+                                    entry.getKey().getColumn().getColumnName());
+                                String fieldName = fieldItem.getFieldName();
+                                // 判断是否是主键
+                                if (fieldName.equals(mapping.get_id())) {
+                                    fieldName = "_id";
+                                }
+                                queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
                             }
                         }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
                     }
-                    return 0;
-                });
-            }
 
-            // 从表的操作
-            for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
-                if (tableItem.isMain()) {
-                    continue;
-                }
-                if (!tableItem.getTableName().equals(dml.getTable())) {
-                    continue;
-                }
-                // ------关联条件出现在主表查询条件------
-                boolean allFieldsSimple = true;
-                for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
-                    if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
-                        allFieldsSimple = false;
-                        break;
+                    if (logger.isDebugEnabled()) {
+                        logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index());
+                    }
+                    boolean result = esTemplate.updateByQuery(mapping, queryBuilder, esFieldData);
+                    if (!result) {
+                        logger.error(
+                            "Join table update es index by query sql error, destination:{}, table: {}, index: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index());
                     }
                 }
-                // 所有查询字段均为简单字段
-                if (allFieldsSimple) {
-                    // 不是子查询
-                    if (!tableItem.isSubQuery()) {
-                        Map<String, Object> esFieldData = new LinkedHashMap<>();
-                        for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
-                            Object value = esTemplate.getValFromData(mapping,
-                                data,
-                                fieldItem.getFieldName(),
-                                fieldItem.getColumn().getColumnName());
-                            esFieldData.put(fieldItem.getFieldName(), value);
-                        }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return 0;
+        });
+    }
 
-                        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
-                        for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields()
-                            .entrySet()) {
-                            for (FieldItem fieldItem : entry.getValue()) {
-                                if (fieldItem.getColumnItems().size() == 1) {
-                                    Object value = esTemplate.getValFromData(mapping,
-                                            data,
-                                            fieldItem.getFieldName(),
-                                            entry.getKey().getColumn().getColumnName());
-
-                                    String fieldName = fieldItem.getFieldName();
-                                    // 判断是否是主键
-                                    if (fieldName.equals(mapping.get_id())) {
-                                        fieldName = "_id";
-                                    }
-                                    queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
+    /**
+     * 关联(子查询), 主表复杂字段operation, 全sql执行
+     *
+     * @param config es配置
+     * @param dml dml信息
+     * @param data 单行dml数据
+     * @param tableItem 当前表配置
+     */
+    private void jonTableWholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
+                                           TableItem tableItem) {
+        ESMapping mapping = config.getEsMapping();
+        StringBuilder sql = new StringBuilder(mapping.getSql() + " WHERE ");
+
+        for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
+            String columnName = fkFieldItem.getColumn().getColumnName();
+            Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
+            if (value instanceof String) {
+                sql.append(tableItem.getAlias())
+                    .append(".")
+                    .append(columnName)
+                    .append("='")
+                    .append(value)
+                    .append("' ")
+                    .append(" AND ");
+            } else {
+                sql.append(tableItem.getAlias())
+                    .append(".")
+                    .append(columnName)
+                    .append("=")
+                    .append(value)
+                    .append(" ")
+                    .append(" AND ");
+            }
+        }
+        int len = sql.length();
+        sql.delete(len - 5, len);
+        DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Join table update es index by query whole sql, destination:{}, table: {}, index: {}, sql: {}",
+                config.getDestination(),
+                dml.getTable(),
+                mapping.get_index(),
+                sql.toString().replace("\n", " "));
+        }
+        ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
+            try {
+                while (rs.next()) {
+                    Map<String, Object> esFieldData = new LinkedHashMap<>();
+                    for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                        Object val = esTemplate
+                            .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
+                        esFieldData.put(fieldItem.getFieldName(), val);
+                    }
+
+                    BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
+                    for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
+                        for (FieldItem fieldItem : entry.getValue()) {
+                            if (fieldItem.getColumnItems().size() == 1) {
+                                Object value = esTemplate
+                                    .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
+                                String fieldName = fieldItem.getFieldName();
+                                // 判断是否是主键
+                                if (fieldName.equals(mapping.get_id())) {
+                                    fieldName = "_id";
                                 }
+                                queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
                             }
                         }
+                    }
 
-                        if (logger.isDebugEnabled()) {
-                            logger.trace(
-                                "Join table update es index by foreign key, destination:{}, table: {}, index: {}",
-                                config.getDestination(),
-                                dml.getTable(),
-                                mapping.get_index());
-                        }
-                        boolean result = esTemplate.updateByQuery(mapping, queryBuilder, esFieldData);
-                        if (!result) {
-                            logger.error(
-                                "Join table update es index by foreign key error, destination:{}, table: {}, index: {}",
-                                config.getDestination(),
-                                dml.getTable(),
-                                mapping.get_index());
-                        }
-                    } else {
-                        StringBuilder sql = new StringBuilder(
-                            "SELECT * FROM (" + tableItem.getSubQuerySql() + ") " + tableItem.getAlias() + " WHERE ");
-
-                        for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
-                            String columnName = fkFieldItem.getColumn().getColumnName();
-                            Object value = esTemplate
-                                .getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
-                            if (value instanceof String) {
-                                sql.append(tableItem.getAlias())
-                                    .append(".")
-                                    .append(columnName)
-                                    .append("='")
-                                    .append(value)
-                                    .append("' ")
-                                    .append(" AND ");
-                            } else {
-                                sql.append(tableItem.getAlias())
-                                    .append(".")
-                                    .append(columnName)
-                                    .append("=")
-                                    .append(value)
-                                    .append(" ")
-                                    .append(" AND ");
-                            }
-                        }
-                        int len = sql.length();
-                        sql.delete(len - 5, len);
-                        DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
-                        if (logger.isTraceEnabled()) {
-                            logger.trace(
-                                "Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
-                                config.getDestination(),
-                                dml.getTable(),
-                                mapping.get_index(),
-                                sql.toString().replace("\n", " "));
-                        }
-                        ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
-                            try {
-                                while (rs.next()) {
-                                    Map<String, Object> esFieldData = new LinkedHashMap<>();
-                                    for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
-                                        Object val = esTemplate.getValFromRS(mapping,
-                                            rs,
-                                            fieldItem.getFieldName(),
-                                            fieldItem.getColumn().getColumnName());
-                                        esFieldData.put(fieldItem.getFieldName(), val);
-                                    }
-
-                                    BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
-                                    for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem
-                                        .getRelationTableFields()
-                                        .entrySet()) {
-                                        for (FieldItem fieldItem : entry.getValue()) {
-                                            if (fieldItem.getColumnItems().size() == 1) {
-                                                Object value = esTemplate.getValFromRS(mapping,
-                                                    rs,
-                                                    fieldItem.getFieldName(),
-                                                    entry.getKey().getColumn().getColumnName());
-                                                String fieldName = fieldItem.getFieldName();
-                                                // 判断是否是主键
-                                                if (fieldName.equals(mapping.get_id())) {
-                                                    fieldName = "_id";
-                                                }
-                                                queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
-                                            }
-                                        }
-                                    }
-
-                                    if (logger.isDebugEnabled()) {
-                                        logger.trace(
-                                            "Join table update es index by foreign key, destination:{}, table: {}, index: {}",
-                                            config.getDestination(),
-                                            dml.getTable(),
-                                            mapping.get_index());
-                                    }
-                                    boolean result = esTemplate.updateByQuery(mapping, queryBuilder, esFieldData);
-                                    if (!result) {
-                                        logger.error(
-                                            "Join table update es index by foreign key error, destination:{}, table: {}, index: {}",
-                                            config.getDestination(),
-                                            dml.getTable(),
-                                            mapping.get_index());
-                                    }
-                                }
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                            return 0;
-                        });
+                    if (logger.isDebugEnabled()) {
+                        logger.trace(
+                            "Join table update es index by query whole sql, destination:{}, table: {}, index: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index());
+                    }
+                    boolean result = esTemplate.updateByQuery(mapping, queryBuilder, esFieldData);
+                    if (!result) {
+                        logger.error(
+                            "Join table update es index by query whole sql error, destination:{}, table: {}, index: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index());
                     }
-                } else {
-                    // TODO 查询总sql
                 }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
-        }
+            return 0;
+        });
     }
 }

+ 3 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -142,7 +142,7 @@ public class ESTemplate {
         return updateByQuery(mapping, queryBuilder, esFieldData, 1);
     }
 
-    public boolean updateByQuery(ESMapping mapping, QueryBuilder queryBuilder, Map<String, Object> esFieldData,
+    private boolean updateByQuery(ESMapping mapping, QueryBuilder queryBuilder, Map<String, Object> esFieldData,
                                  int counter) {
         if (CollectionUtils.isEmpty(esFieldData)) {
             return true;
@@ -204,10 +204,10 @@ public class ESTemplate {
 
         if (response.getStatus().getVersionConflicts() > 0) {
             if (counter >= 3) {
-                logger.error("第 {} 次执行updateByQuery, 依旧存在版本冲突,不再继续重试。", counter);
+                logger.error("第 {} 次执行updateByQuery, 依旧存在分片版本冲突,不再继续重试。", counter);
                 return false;
             }
-            logger.warn("本次updateByQuery存在版本冲突,准备重新执行...");
+            logger.warn("本次updateByQuery存在分片版本冲突,准备重新执行...");
             try {
                 TimeUnit.SECONDS.sleep(1);
             } catch (InterruptedException e) {

+ 57 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOne2Test.java

@@ -0,0 +1,57 @@
+package com.alibaba.otter.canal.client.adapter.es.test.sync;
+
+import java.util.*;
+
+import org.elasticsearch.action.get.GetResponse;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+public class RoleSyncJoinOne2Test {
+
+    private ESAdapter esAdapter;
+
+    @Before
+    public void init() {
+        AdapterConfigs.put("es", "mytest_user_join_one2.yml");
+        esAdapter = Common.init();
+    }
+
+    /**
+     * 非子查询从表插入 (确保主表记录必须有数据)
+     */
+    @Test
+    public void insertTest01() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("role");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("role_name", "admin2");
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("admin2_", response.getSource().get("_role_name"));
+    }
+
+    @After
+    public void after() {
+        esAdapter.destroy();
+        DatasourceConfig.DATA_SOURCES.values().forEach(DruidDataSource::close);
+    }
+}

+ 58 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinSub2Test.java

@@ -0,0 +1,58 @@
+package com.alibaba.otter.canal.client.adapter.es.test.sync;
+
+import java.util.*;
+
+import org.elasticsearch.action.get.GetResponse;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+public class RoleSyncJoinSub2Test {
+
+    private ESAdapter esAdapter;
+
+    @Before
+    public void init() {
+        AdapterConfigs.put("es", "mytest_user_join_sub2.yml");
+        esAdapter = Common.init();
+    }
+
+    /**
+     * 子查询从表插入 (确保主表记录必须有数据)
+     */
+    @Test
+    public void insertTest01() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("label");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("user_id",1L);
+        data.put("label", "a");
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("a;b_", response.getSource().get("_labels"));
+    }
+
+    @After
+    public void after() {
+        esAdapter.destroy();
+        DatasourceConfig.DATA_SOURCES.values().forEach(DruidDataSource::close);
+    }
+}

+ 10 - 0
client-adapter/elasticsearch/src/test/resources/es/mytest_user_join_one2.yml

@@ -0,0 +1,10 @@
+dataSourceKey: defaultDS
+destination: example
+esMapping:
+  _index: mytest_user
+  _type: _doc
+  _id: _id
+  sql: "select a.id as _id, concat(a.name,'_') as _name, a.role_id as _role_id,
+        concat(b.role_name,'_') as _role_name, a.c_time as _c_time from user a
+        left join role b on b.id=a.role_id"
+  commitBatch: 3000

+ 11 - 0
client-adapter/elasticsearch/src/test/resources/es/mytest_user_join_sub2.yml

@@ -0,0 +1,11 @@
+dataSourceKey: defaultDS
+destination: example
+esMapping:
+  _index: mytest_user
+  _type: _doc
+  _id: _id
+  sql: "select a.id as _id, concat(a.name,'_') as _name, a.role_id as _role_id,
+        concat(b.labels, '_') as _labels, a.c_time as _c_time from user a
+        left join (select user_id, group_concat(label separator ';') as labels from label
+        group by user_id) b on b.user_id=a.id"
+  commitBatch: 3000