Browse Source

ES adapter代码重新整理
新增upsert属性

mcy 6 years ago
parent
commit
83b760494d

+ 4 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -146,9 +146,10 @@ public class ESAdapter implements OuterAdapter {
         for (Dml dml : dmls) {
             sync(dml);
         }
+        esSyncService.commit(); // 批次统一提交
     }
 
-    public void sync(Dml dml) {
+    private void sync(Dml dml) {
         String database = dml.getDatabase();
         String table = dml.getTable();
         Map<String, ESSyncConfig> configMap;
@@ -174,7 +175,7 @@ public class ESAdapter implements OuterAdapter {
             DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
             ESEtlService esEtlService = new ESEtlService(transportClient, config);
             if (dataSource != null) {
-                return esEtlService.importData(params, false);
+                return esEtlService.importData(params);
             } else {
                 etlResult.setSucceeded(false);
                 etlResult.setErrorMessage("DataSource not found");
@@ -188,7 +189,7 @@ public class ESAdapter implements OuterAdapter {
                 // 取所有的destination为task的配置
                 if (configTmp.getDestination().equals(task)) {
                     ESEtlService esEtlService = new ESEtlService(transportClient, configTmp);
-                    EtlResult etlRes = esEtlService.importData(params, false);
+                    EtlResult etlRes = esEtlService.importData(params);
                     if (!etlRes.getSucceeded()) {
                         resSuccess = false;
                         resultMsg.append(etlRes.getErrorMessage()).append("\n");

+ 12 - 11
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -30,8 +30,8 @@ public class ESSyncConfig {
         if (esMapping._type == null) {
             throw new NullPointerException("esMapping._type");
         }
-        if (esMapping._id == null && esMapping.pk == null) {
-            throw new NullPointerException("esMapping._id and esMapping.pk");
+        if (esMapping._id == null) {
+            throw new NullPointerException("esMapping._id");
         }
         if (esMapping.sql == null) {
             throw new NullPointerException("esMapping.sql");
@@ -83,8 +83,9 @@ public class ESSyncConfig {
         private String              _index;
         private String              _type;
         private String              _id;
+        private boolean             upsert          = false;
         private String              pk;
-        private String              parent;
+        // private String parent;
         private String              sql;
         // 对象字段, 例: objFields:
         // - _labels: array:;
@@ -121,20 +122,20 @@ public class ESSyncConfig {
             this._id = _id;
         }
 
-        public String getPk() {
-            return pk;
+        public boolean isUpsert() {
+            return upsert;
         }
 
-        public void setPk(String pk) {
-            this.pk = pk;
+        public void setUpsert(boolean upsert) {
+            this.upsert = upsert;
         }
 
-        public String getParent() {
-            return parent;
+        public String getPk() {
+            return pk;
         }
 
-        public void setParent(String parent) {
-            this.parent = parent;
+        public void setPk(String pk) {
+            this.pk = pk;
         }
 
         public Map<String, String> getObjFields() {

+ 5 - 5
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java

@@ -134,11 +134,11 @@ public class SchemaItem {
     }
 
     public FieldItem getIdFieldItem(ESMapping mapping) {
-        if (mapping.get_id() != null) {
-            return getSelectFields().get(mapping.get_id());
-        } else {
-            return getSelectFields().get(mapping.getPk());
-        }
+        //TODO if (mapping.get_id() != null) {
+        return getSelectFields().get(mapping.get_id());
+        // } else {
+        // return getSelectFields().get(mapping.getPk());
+        // }
     }
 
     public static class TableItem {

+ 72 - 74
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -1,6 +1,9 @@
 package com.alibaba.otter.canal.client.adapter.es.service;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -13,11 +16,8 @@ import javax.sql.DataSource;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +51,7 @@ public class ESEtlService {
         this.config = config;
     }
 
-    public EtlResult importData(List<String> params, boolean bulk) {
+    public EtlResult importData(List<String> params) {
         EtlResult etlResult = new EtlResult();
         AtomicLong impCount = new AtomicLong();
         List<String> errMsg = new ArrayList<>();
@@ -93,54 +93,49 @@ public class ESEtlService {
                 logger.debug("etl sql : {}", mapping.getSql());
             }
 
-            if (bulk) {
-                // 获取总数
-                String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
-                long cnt = (Long) ESSyncUtil.sqlRS(dataSource, countSql, rs -> {
-                    Long count = null;
-                    try {
-                        if (rs.next()) {
-                            count = ((Number) rs.getObject(1)).longValue();
-                        }
-                    } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
-                    }
-                    return count == null ? 0L : count;
-                });
-
-                // 当大于1万条记录时开启多线程
-                if (cnt >= 10000) {
-                    int threadCount = 3; // TODO 从配置读取默认为3
-                    long perThreadCnt = cnt / threadCount;
-                    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
-                    List<Future<Boolean>> futures = new ArrayList<>(threadCount);
-                    for (int i = 0; i < threadCount; i++) {
-                        long offset = i * perThreadCnt;
-                        Long size = null;
-                        if (i != threadCount - 1) {
-                            size = perThreadCnt;
-                        }
-                        String sqlFinal;
-                        if (size != null) {
-                            sqlFinal = sql + " LIMIT " + offset + "," + size;
-                        } else {
-                            sqlFinal = sql + " LIMIT " + offset + "," + cnt;
-                        }
-                        Future<Boolean> future = executor
-                            .submit(() -> executeSqlImport(dataSource, sqlFinal, mapping, impCount, errMsg));
-                        futures.add(future);
+            // 获取总数
+            String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
+            long cnt = (Long) ESSyncUtil.sqlRS(dataSource, countSql, rs -> {
+                Long count = null;
+                try {
+                    if (rs.next()) {
+                        count = ((Number) rs.getObject(1)).longValue();
                     }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+                return count == null ? 0L : count;
+            });
 
-                    for (Future<Boolean> future : futures) {
-                        future.get();
+            // 当大于1万条记录时开启多线程
+            if (cnt >= 10000) {
+                int threadCount = 3; // 从配置读取默认为3
+                long perThreadCnt = cnt / threadCount;
+                ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+                List<Future<Boolean>> futures = new ArrayList<>(threadCount);
+                for (int i = 0; i < threadCount; i++) {
+                    long offset = i * perThreadCnt;
+                    Long size = null;
+                    if (i != threadCount - 1) {
+                        size = perThreadCnt;
+                    }
+                    String sqlFinal;
+                    if (size != null) {
+                        sqlFinal = sql + " LIMIT " + offset + "," + size;
+                    } else {
+                        sqlFinal = sql + " LIMIT " + offset + "," + cnt;
                     }
+                    Future<Boolean> future = executor
+                        .submit(() -> executeSqlImport(dataSource, sqlFinal, mapping, impCount, errMsg));
+                    futures.add(future);
+                }
 
-                    executor.shutdown();
-                } else {
-                    executeSqlImport(dataSource, sql, mapping, impCount, errMsg);
+                for (Future<Boolean> future : futures) {
+                    future.get();
                 }
+
+                executor.shutdown();
             } else {
-                logger.info("自动ETL,无需统计记录总条数,直接进行ETL, index: {}", esIndex);
                 executeSqlImport(dataSource, sql, mapping, impCount, errMsg);
             }
 
@@ -205,32 +200,35 @@ public class ESEtlService {
                         }
 
                         if (idVal != null) {
-                            if (mapping.getParent() == null) {
+                            if (mapping.isUpsert()) {
+                                bulkRequestBuilder.add(transportClient
+                                    .prepareUpdate(mapping.get_index(), mapping.get_type(), idVal.toString())
+                                    .setDoc(esFieldData)
+                                    .setDocAsUpsert(true));
+                            } else {
                                 bulkRequestBuilder.add(transportClient
                                     .prepareIndex(mapping.get_index(), mapping.get_type(), idVal.toString())
                                     .setSource(esFieldData));
-                            } else {
-                                // ignore
                             }
                         } else {
-                            idVal = rs.getObject(mapping.getPk());
-                            if (mapping.getParent() == null) {
-                                // 删除pk对应的数据
-                                SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-                                    .setTypes(mapping.get_type())
-                                    .setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))
-                                    .get();
-                                for (SearchHit hit : response.getHits()) {
-                                    bulkRequestBuilder.add(transportClient
-                                        .prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
-                                }
-
-                                bulkRequestBuilder
-                                    .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type())
-                                        .setSource(esFieldData));
-                            } else {
-                                // ignore
-                            }
+                            // TODO idVal = rs.getObject(mapping.getPk());
+                            // if (mapping.getParent() == null) {
+                            // // 删除pk对应的数据
+                            // SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+                            // .setTypes(mapping.get_type())
+                            // .setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))
+                            // .get();
+                            // for (SearchHit hit : response.getHits()) {
+                            // bulkRequestBuilder.add(transportClient
+                            // .prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
+                            // }
+                            //
+                            // bulkRequestBuilder
+                            // .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type())
+                            // .setSource(esFieldData));
+                            // } else {
+                            // // ignore
+                            // }
                         }
 
                         if (bulkRequestBuilder.numberOfActions() % mapping.getCommitBatch() == 0
@@ -238,11 +236,11 @@ public class ESEtlService {
                             long esBatchBegin = System.currentTimeMillis();
                             BulkResponse rp = bulkRequestBuilder.execute().actionGet();
                             if (rp.hasFailures()) {
-                                this.processFailBulkResponse(rp, Objects.nonNull(mapping.getParent()));
+                                this.processFailBulkResponse(rp, /* TODO Objects.nonNull(mapping.getParent()) */ false);
                             }
 
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("全量数据批量导入批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
+                            if (logger.isTraceEnabled()) {
+                                logger.trace("全量数据批量导入批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
                                     (System.currentTimeMillis() - batchBegin),
                                     (System.currentTimeMillis() - esBatchBegin),
                                     bulkRequestBuilder.numberOfActions(),
@@ -259,10 +257,10 @@ public class ESEtlService {
                         long esBatchBegin = System.currentTimeMillis();
                         BulkResponse rp = bulkRequestBuilder.execute().actionGet();
                         if (rp.hasFailures()) {
-                            this.processFailBulkResponse(rp, Objects.nonNull(mapping.getParent()));
+                            this.processFailBulkResponse(rp, /* TODO Objects.nonNull(mapping.getParent()) */false);
                         }
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("全量数据批量导入最后批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("全量数据批量导入最后批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
                                 (System.currentTimeMillis() - batchBegin),
                                 (System.currentTimeMillis() - esBatchBegin),
                                 bulkRequestBuilder.numberOfActions(),

+ 14 - 52
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -203,7 +203,7 @@ public class ESSyncService {
                 // ------主表 查询sql来更新------
                 if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
                     ESMapping mapping = config.getEsMapping();
-                    String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+                    String idFieldName = mapping.get_id(); // TODO == null ? mapping.getPk() : mapping.get_id();
                     FieldItem idFieldItem = schemaItem.getSelectFields().get(idFieldName);
 
                     boolean idFieldSimple = true;
@@ -344,14 +344,7 @@ public class ESSyncService {
                             mapping.get_index(),
                             idVal);
                     }
-                    boolean result = esTemplate.delete(mapping, idVal);
-                    if (!result) {
-                        logger.error("Main table delete es index error, destination:{}, table: {}, index: {}, id: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index(),
-                            idVal);
-                    }
+                    esTemplate.delete(mapping, idVal);
                 } else {
                     // ------主键带函数, 查询sql获取主键删除------
                     mainTableDelete(config, dml, data);
@@ -417,14 +410,7 @@ public class ESSyncService {
                 mapping.get_index(),
                 idVal);
         }
-        boolean result = esTemplate.insert(mapping, idVal, esFieldData);
-        if (!result) {
-            logger.error("Single table insert to es index error, destination:{}, table: {}, index: {}, id: {}",
-                config.getDestination(),
-                dml.getTable(),
-                mapping.get_index(),
-                idVal);
-        }
+        esTemplate.insert(mapping, idVal, esFieldData);
     }
 
     /**
@@ -461,15 +447,7 @@ public class ESSyncService {
                             mapping.get_index(),
                             idVal);
                     }
-                    boolean result = esTemplate.insert(mapping, idVal, esFieldData);
-                    if (!result) {
-                        logger.error(
-                            "Main table insert to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index(),
-                            idVal);
-                    }
+                    esTemplate.insert(mapping, idVal, esFieldData);
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -504,15 +482,7 @@ public class ESSyncService {
                             mapping.get_index(),
                             idVal);
                     }
-                    boolean result = esTemplate.delete(mapping, idVal);
-                    if (!result) {
-                        logger.error(
-                            "Main table delete to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index(),
-                            idVal);
-                    }
+                    esTemplate.delete(mapping, idVal);
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -799,14 +769,7 @@ public class ESSyncService {
                 mapping.get_index(),
                 idVal);
         }
-        boolean result = esTemplate.update(mapping, idVal, esFieldData);
-        if (!result) {
-            logger.error("Main table update to es index error, destination:{}, table: {}, index: {}, id: {}",
-                config.getDestination(),
-                dml.getTable(),
-                mapping.get_index(),
-                idVal);
-        }
+        esTemplate.update(mapping, idVal, esFieldData);
     }
 
     /**
@@ -843,15 +806,7 @@ public class ESSyncService {
                             mapping.get_index(),
                             idVal);
                     }
-                    boolean result = esTemplate.update(mapping, idVal, esFieldData);
-                    if (!result) {
-                        logger.error(
-                            "Main table update to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index(),
-                            idVal);
-                    }
+                    esTemplate.update(mapping, idVal, esFieldData);
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -859,4 +814,11 @@ public class ESSyncService {
             return 0;
         });
     }
+
+    /**
+     * 提交批次
+     */
+    public void commit() {
+        esTemplate.commit();
+    }
 }

+ 434 - 526
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -1,526 +1,434 @@
-package com.alibaba.otter.canal.client.adapter.es.support;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-import javax.sql.DataSource;
-
-import com.alibaba.fastjson.JSON;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.reindex.BulkByScrollResponse;
-import org.elasticsearch.index.reindex.UpdateByQueryAction;
-import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.script.Script;
-import org.elasticsearch.script.ScriptType;
-import org.elasticsearch.search.SearchHit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.CollectionUtils;
-
-import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
-import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
-import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
-import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
-import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
-
-/**
- * ES 操作模板
- *
- * @author rewerma 2018-11-01
- * @version 1.0.0
- */
-public class ESTemplate {
-
-    private static final Logger logger         = LoggerFactory.getLogger(ESTemplate.class);
-
-    private static final int    MAX_BATCH_SIZE = 1000;
-
-    private TransportClient     transportClient;
-
-    public ESTemplate(TransportClient transportClient){
-        this.transportClient = transportClient;
-    }
-
-    /**
-     * 插入数据
-     * 
-     * @param mapping
-     * @param pkVal
-     * @param esFieldData
-     * @return
-     */
-    public boolean insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
-        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
-        if (mapping.get_id() != null) {
-            bulkRequestBuilder
-                .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                    .setSource(esFieldData));
-        } else {
-            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-                .setTypes(mapping.get_type())
-                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-                .setSize(MAX_BATCH_SIZE)
-                .get();
-            for (SearchHit hit : response.getHits()) {
-                bulkRequestBuilder
-                    .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
-            }
-            bulkRequestBuilder
-                .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type()).setSource(esFieldData));
-        }
-        return commitBulkRequest(bulkRequestBuilder);
-    }
-
-    /**
-     * 根据主键更新数据
-     * 
-     * @param mapping
-     * @param pkVal
-     * @param esFieldData
-     * @return
-     */
-    public boolean update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
-        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
-        append4Update(bulkRequestBuilder, mapping, pkVal, esFieldData);
-        return commitBulkRequest(bulkRequestBuilder);
-    }
-
-    public void append4Update(BulkRequestBuilder bulkRequestBuilder, ESMapping mapping, Object pkVal,
-                              Map<String, Object> esFieldData) {
-        if (mapping.get_id() != null) {
-            bulkRequestBuilder
-                .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                    .setDoc(esFieldData));
-        } else {
-            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-                .setTypes(mapping.get_type())
-                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-                .setSize(MAX_BATCH_SIZE)
-                .get();
-            for (SearchHit hit : response.getHits()) {
-                bulkRequestBuilder
-                    .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
-                        .setDoc(esFieldData));
-            }
-        }
-    }
-
-    /**
-     * update by query
-     *
-     * @param config
-     * @param paramsTmp
-     * @param esFieldData
-     * @return
-     */
-    public boolean updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {
-        if (paramsTmp.isEmpty()) {
-            return false;
-        }
-        ESMapping mapping = config.getEsMapping();
-        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
-        paramsTmp.forEach((fieldName, value) -> queryBuilder.must(QueryBuilders.termsQuery(fieldName, value)));
-
-        SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-            .setTypes(mapping.get_type())
-            .setSize(0)
-            .setQuery(queryBuilder)
-            .get();
-        long count = response.getHits().getTotalHits();
-        // 如果更新量大于Max, 查询sql批量更新
-        if (count > MAX_BATCH_SIZE) {
-            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
-
-            DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
-            // 查询sql更新
-            StringBuilder sql = new StringBuilder("SELECT * FROM (" + mapping.getSql() + ") _v WHERE ");
-            paramsTmp.forEach(
-                (fieldName, value) -> sql.append("_v.").append(fieldName).append("=").append(value).append(" AND "));
-            int len = sql.length();
-            sql.delete(len - 4, len);
-            ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
-                int exeCount = 1;
-                try {
-                    BulkRequestBuilder bulkRequestBuilderTmp = bulkRequestBuilder;
-                    while (rs.next()) {
-                        Object idVal = getIdValFromRS(mapping, rs);
-                        append4Update(bulkRequestBuilderTmp, mapping, idVal, esFieldData);
-
-                        if (exeCount % mapping.getCommitBatch() == 0 && bulkRequestBuilderTmp.numberOfActions() > 0) {
-                            commitBulkRequest(bulkRequestBuilderTmp);
-                            bulkRequestBuilderTmp = transportClient.prepareBulk();
-                        }
-                        exeCount++;
-                    }
-
-                    if (bulkRequestBuilder.numberOfActions() > 0) {
-                        commitBulkRequest(bulkRequestBuilderTmp);
-                    }
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-                return 0;
-            });
-            return true;
-        } else {
-            return updateByQuery(mapping, queryBuilder, esFieldData, 1);
-        }
-    }
-
-    private boolean updateByQuery(ESMapping mapping, QueryBuilder queryBuilder, Map<String, Object> esFieldData,
-                                  int counter) {
-        if (CollectionUtils.isEmpty(esFieldData)) {
-            return true;
-        }
-
-        StringBuilder sb = new StringBuilder();
-        esFieldData.forEach((key, value) -> {
-            if (value instanceof Map) {
-                Map<?, ?> mapValue = (Map<?, ?>) value;
-                if (mapValue.containsKey("lon") && mapValue.containsKey("lat") && mapValue.size() == 2) {
-                    sb.append("ctx._source")
-                        .append("['")
-                        .append(key)
-                        .append("']")
-                        .append(" = [")
-                        .append(mapValue.get("lon"))
-                        .append(", ")
-                        .append(mapValue.get("lat"))
-                        .append("];");
-                } else {
-                    sb.append("ctx._source").append("[\"").append(key).append("\"]").append(" = ");
-                    sb.append(JSON.toJSONString(value));
-                    sb.append(";");
-                }
-            } else if (value instanceof List) {
-                sb.append("ctx._source").append("[\"").append(key).append("\"]").append(" = ");
-                sb.append(JSON.toJSONString(value));
-                sb.append(";");
-            } else if (value instanceof String) {
-                sb.append("ctx._source")
-                    .append("['")
-                    .append(key)
-                    .append("']")
-                    .append(" = '")
-                    .append(value)
-                    .append("';");
-            } else {
-                sb.append("ctx._source").append("['").append(key).append("']").append(" = ").append(value).append(";");
-            }
-        });
-        String scriptLine = sb.toString();
-        if (logger.isTraceEnabled()) {
-            logger.trace(scriptLine);
-        }
-
-        UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(transportClient);
-        updateByQuery.source(mapping.get_index())
-            .abortOnVersionConflict(false)
-            .filter(queryBuilder)
-            .script(new Script(ScriptType.INLINE, "painless", scriptLine, Collections.emptyMap()));
-
-        BulkByScrollResponse response = updateByQuery.get();
-        if (logger.isTraceEnabled()) {
-            logger.trace("updateByQuery response: {}", response.getStatus());
-        }
-        if (!CollectionUtils.isEmpty(response.getSearchFailures())) {
-            logger.error("script update_for_search has search error: " + response.getBulkFailures());
-            return false;
-        }
-
-        if (!CollectionUtils.isEmpty(response.getBulkFailures())) {
-            logger.error("script update_for_search has update error: " + response.getBulkFailures());
-            return false;
-        }
-
-        if (response.getStatus().getVersionConflicts() > 0) {
-            if (counter >= 3) {
-                logger.error("第 {} 次执行updateByQuery, 依旧存在分片版本冲突,不再继续重试。", counter);
-                return false;
-            }
-            logger.warn("本次updateByQuery存在分片版本冲突,准备重新执行...");
-            try {
-                TimeUnit.SECONDS.sleep(1);
-            } catch (InterruptedException e) {
-                // ignore
-            }
-            return updateByQuery(mapping, queryBuilder, esFieldData, ++counter);
-        }
-
-        return true;
-    }
-
-    /**
-     * 通过主键删除数据
-     *
-     * @param mapping
-     * @param pkVal
-     * @return
-     */
-    public boolean delete(ESMapping mapping, Object pkVal) {
-        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
-        if (mapping.get_id() != null) {
-            bulkRequestBuilder
-                .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), pkVal.toString()));
-        } else {
-            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-                .setTypes(mapping.get_type())
-                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-                .setSize(MAX_BATCH_SIZE)
-                .get();
-            for (SearchHit hit : response.getHits()) {
-                bulkRequestBuilder
-                    .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
-            }
-        }
-        return commitBulkRequest(bulkRequestBuilder);
-    }
-
-    /**
-     * 批量提交
-     *
-     * @param bulkRequestBuilder
-     * @return
-     */
-    private static boolean commitBulkRequest(BulkRequestBuilder bulkRequestBuilder) {
-        if (bulkRequestBuilder.numberOfActions() > 0) {
-            BulkResponse response = bulkRequestBuilder.execute().actionGet();
-            if (response.hasFailures()) {
-                for (BulkItemResponse itemResponse : response.getItems()) {
-                    if (!itemResponse.isFailed()) {
-                        continue;
-                    }
-
-                    if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
-                        logger.warn(itemResponse.getFailureMessage());
-                    } else {
-                        logger.error("ES sync commit error: {}", itemResponse.getFailureMessage());
-                    }
-                }
-            }
-
-            return !response.hasFailures();
-        }
-        return true;
-    }
-
-    public Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName,
-                               String columnName) throws SQLException {
-        String esType = getEsType(mapping, fieldName);
-
-        Object value = resultSet.getObject(columnName);
-        if (value instanceof Boolean) {
-            if (!"boolean".equals(esType)) {
-                value = resultSet.getByte(columnName);
-            }
-        }
-
-        // 如果是对象类型
-        if (mapping.getObjFields().containsKey(fieldName)) {
-            return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
-        } else {
-            return ESSyncUtil.typeConvert(value, esType);
-        }
-    }
-
-    public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,
-                                  Map<String, Object> esFieldData) throws SQLException {
-        SchemaItem schemaItem = mapping.getSchemaItem();
-        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
-        Object resultIdVal = null;
-        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
-            Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
-
-            if (fieldItem.getFieldName().equals(idFieldName)) {
-                resultIdVal = value;
-            }
-
-            if (!fieldItem.getFieldName().equals(mapping.get_id())
-                && !mapping.getSkips().contains(fieldItem.getFieldName())) {
-                esFieldData.put(fieldItem.getFieldName(), value);
-            }
-        }
-        return resultIdVal;
-    }
-
-    public Object getIdValFromRS(ESMapping mapping, ResultSet resultSet) throws SQLException {
-        SchemaItem schemaItem = mapping.getSchemaItem();
-        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
-        Object resultIdVal = null;
-        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
-            Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
-
-            if (fieldItem.getFieldName().equals(idFieldName)) {
-                resultIdVal = value;
-                break;
-            }
-        }
-        return resultIdVal;
-    }
-
-    public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet, Map<String, Object> dmlOld,
-                                  Map<String, Object> esFieldData) throws SQLException {
-        SchemaItem schemaItem = mapping.getSchemaItem();
-        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
-        Object resultIdVal = null;
-        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
-            if (fieldItem.getFieldName().equals(idFieldName)) {
-                resultIdVal = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
-            }
-
-            for (ColumnItem columnItem : fieldItem.getColumnItems()) {
-                if (dmlOld.containsKey(columnItem.getColumnName())
-                    && !mapping.getSkips().contains(fieldItem.getFieldName())) {
-                    esFieldData.put(fieldItem.getFieldName(),
-                        getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
-                    break;
-                }
-            }
-        }
-        return resultIdVal;
-    }
-
-    public Object getValFromData(ESMapping mapping, Map<String, Object> dmlData, String fieldName, String columnName) {
-        String esType = getEsType(mapping, fieldName);
-        Object value = dmlData.get(columnName);
-        if (value instanceof Byte) {
-            if ("boolean".equals(esType)) {
-                value = ((Byte) value).intValue() != 0;
-            }
-        }
-
-        // 如果是对象类型
-        if (mapping.getObjFields().containsKey(fieldName)) {
-            return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
-        } else {
-            return ESSyncUtil.typeConvert(value, esType);
-        }
-    }
-
-    /**
-     * 将dml的data转换为es的data
-     *
-     * @param mapping 配置mapping
-     * @param dmlData dml data
-     * @param esFieldData es data
-     * @return 返回 id 值
-     */
-    public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData,
-                                       Map<String, Object> esFieldData) {
-        SchemaItem schemaItem = mapping.getSchemaItem();
-        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
-        Object resultIdVal = null;
-        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
-            String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
-            Object value = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
-
-            if (fieldItem.getFieldName().equals(idFieldName)) {
-                resultIdVal = value;
-            }
-
-            if (!fieldItem.getFieldName().equals(mapping.get_id())
-                && !mapping.getSkips().contains(fieldItem.getFieldName())) {
-                esFieldData.put(fieldItem.getFieldName(), value);
-            }
-        }
-        return resultIdVal;
-    }
-
-    /**
-     * 将dml的data, old转换为es的data
-     *
-     * @param mapping 配置mapping
-     * @param dmlData dml data
-     * @param esFieldData es data
-     * @return 返回 id 值
-     */
-    public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData, Map<String, Object> dmlOld,
-                                       Map<String, Object> esFieldData) {
-        SchemaItem schemaItem = mapping.getSchemaItem();
-        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
-        Object resultIdVal = null;
-        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
-            String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
-
-            if (fieldItem.getFieldName().equals(idFieldName)) {
-                resultIdVal = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
-            }
-
-            if (dmlOld.get(columnName) != null && !mapping.getSkips().contains(fieldItem.getFieldName())) {
-                esFieldData.put(fieldItem.getFieldName(),
-                    getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
-            }
-        }
-        return resultIdVal;
-    }
-
-    /**
-     * es 字段类型本地缓存
-     */
-    private static ConcurrentMap<String, Map<String, String>> esFieldTypes = new ConcurrentHashMap<>();
-
-    /**
-     * 获取es mapping中的属性类型
-     *
-     * @param mapping mapping配置
-     * @param fieldName 属性名
-     * @return 类型
-     */
-    @SuppressWarnings("unchecked")
-    private String getEsType(ESMapping mapping, String fieldName) {
-        String key = mapping.get_index() + "-" + mapping.get_type();
-        Map<String, String> fieldType = esFieldTypes.get(key);
-        if (fieldType == null) {
-            ImmutableOpenMap<String, MappingMetaData> mappings;
-            try {
-                mappings = transportClient.admin()
-                    .cluster()
-                    .prepareState()
-                    .execute()
-                    .actionGet()
-                    .getState()
-                    .getMetaData()
-                    .getIndices()
-                    .get(mapping.get_index())
-                    .getMappings();
-            } catch (NullPointerException e) {
-                throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
-            }
-            MappingMetaData mappingMetaData = mappings.get(mapping.get_type());
-            if (mappingMetaData == null) {
-                throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
-            }
-
-            fieldType = new LinkedHashMap<>();
-
-            Map<String, Object> sourceMap = mappingMetaData.getSourceAsMap();
-            Map<String, Object> esMapping = (Map<String, Object>) sourceMap.get("properties");
-            for (Map.Entry<String, Object> entry : esMapping.entrySet()) {
-                Map<String, Object> value = (Map<String, Object>) entry.getValue();
-                if (value.containsKey("properties")) {
-                    fieldType.put(entry.getKey(), "object");
-                } else {
-                    fieldType.put(entry.getKey(), (String) value.get("type"));
-                }
-            }
-            esFieldTypes.put(key, fieldType);
-        }
-
-        return fieldType.get(fieldName);
-    }
-}
+package com.alibaba.otter.canal.client.adapter.es.support;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.sql.DataSource;
+
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+
+/**
+ * ES 操作模板
+ *
+ * @author rewerma 2018-11-01
+ * @version 1.0.0
+ */
+public class ESTemplate {
+
+    private static final Logger         logger         = LoggerFactory.getLogger(ESTemplate.class);
+
+    private static final int            MAX_BATCH_SIZE = 1000;
+
+    private TransportClient             transportClient;
+
+    private volatile BulkRequestBuilder bulkRequestBuilder;
+
+    public ESTemplate(TransportClient transportClient){
+        this.transportClient = transportClient;
+        this.bulkRequestBuilder = transportClient.prepareBulk();
+    }
+
+    public BulkRequestBuilder getBulk() {
+        return bulkRequestBuilder;
+    }
+
+    /**
+     * 插入数据
+     *
+     * @param mapping
+     * @param pkVal
+     * @param esFieldData
+     * @return
+     */
+    public void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
+        if (mapping.get_id() != null) {
+            if (mapping.isUpsert()) {
+                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                        .setDoc(esFieldData)
+                        .setDocAsUpsert(true));
+            } else {
+                getBulk().add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                    .setSource(esFieldData));
+            }
+            commitBatch();
+        } else {
+            // TODO SearchResponse response =
+            // transportClient.prepareSearch(mapping.get_index())
+            // .setTypes(mapping.get_type())
+            // .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+            // .setSize(MAX_BATCH_SIZE)
+            // .get();
+            // for (SearchHit hit : response.getHits()) {
+            // bulkRequestBuilder
+            // .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(),
+            // hit.getId()));
+            // }
+            // bulkRequestBuilder
+            // .add(transportClient.prepareIndex(mapping.get_index(),
+            // mapping.get_type()).setSource(esFieldData));
+        }
+        // return commitBulkRequest(bulkRequestBuilder);
+    }
+
+    /**
+     * 根据主键更新数据
+     *
+     * @param mapping
+     * @param pkVal
+     * @param esFieldData
+     * @return
+     */
+    public void update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
+        append4Update(getBulk(), mapping, pkVal, esFieldData);
+        commitBatch();
+    }
+
+    private void append4Update(BulkRequestBuilder bulkRequestBuilder, ESMapping mapping, Object pkVal,
+                               Map<String, Object> esFieldData) {
+        if (mapping.get_id() != null) {
+            bulkRequestBuilder
+                .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                    .setDoc(esFieldData));
+        } else {
+            // TODO SearchResponse response =
+            // transportClient.prepareSearch(mapping.get_index())
+            // .setTypes(mapping.get_type())
+            // .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+            // .setSize(MAX_BATCH_SIZE)
+            // .get();
+            // for (SearchHit hit : response.getHits()) {
+            // bulkRequestBuilder
+            // .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(),
+            // hit.getId())
+            // .setDoc(esFieldData));
+            // }
+        }
+    }
+
+    /**
+     * TODO XXX update by query
+     *
+     * @param config
+     * @param paramsTmp
+     * @param esFieldData
+     * @return
+     */
+    public boolean updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {
+        if (paramsTmp.isEmpty()) {
+            return false;
+        }
+        ESMapping mapping = config.getEsMapping();
+        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
+        paramsTmp.forEach((fieldName, value) -> queryBuilder.must(QueryBuilders.termsQuery(fieldName, value)));
+
+        // 查询sql批量更新
+        DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+        StringBuilder sql = new StringBuilder("SELECT * FROM (" + mapping.getSql() + ") _v WHERE ");
+        paramsTmp.forEach(
+            (fieldName, value) -> sql.append("_v.").append(fieldName).append("=").append(value).append(" AND "));
+        int len = sql.length();
+        sql.delete(len - 4, len);
+        Integer syncCount = (Integer) ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
+            int count = 0;
+            try {
+                while (rs.next()) {
+                    Object idVal = getIdValFromRS(mapping, rs);
+                    append4Update(getBulk(), mapping, idVal, esFieldData);
+                    commitBatch();
+                    count++;
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return count;
+        });
+        if (logger.isTraceEnabled()) {
+            logger.trace("Update ES by query effect {} records", syncCount);
+        }
+        return true;
+    }
+
+    /**
+     * 通过主键删除数据
+     *
+     * @param mapping
+     * @param pkVal
+     * @return
+     */
+    public void delete(ESMapping mapping, Object pkVal) {
+        if (mapping.get_id() != null) {
+            getBulk().add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), pkVal.toString()));
+            commitBatch();
+        } else {
+            // TODO SearchResponse response =
+            // transportClient.prepareSearch(mapping.get_index())
+            // .setTypes(mapping.get_type())
+            // .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+            // .setSize(MAX_BATCH_SIZE)
+            // .get();
+            // for (SearchHit hit : response.getHits()) {
+            // bulkRequestBuilder
+            // .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(),
+            // hit.getId()));
+            // }
+        }
+        // return commitBulkRequest(bulkRequestBuilder);
+    }
+
+    /**
+     * 提交批次
+     */
+    public void commit() {
+        if (getBulk().numberOfActions() >= 0) {
+            BulkResponse response = getBulk().execute().actionGet();
+            if (response.hasFailures()) {
+                for (BulkItemResponse itemResponse : response.getItems()) {
+                    if (!itemResponse.isFailed()) {
+                        continue;
+                    }
+
+                    if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
+                        logger.error(itemResponse.getFailureMessage());
+                    } else {
+                        throw new RuntimeException("ES sync commit error" + itemResponse.getFailureMessage());
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * 如果大于批量则提交批次
+     */
+    private void commitBatch() {
+        if (getBulk().numberOfActions() >= MAX_BATCH_SIZE) {
+            commit();
+        }
+    }
+
+    public Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName,
+                               String columnName) throws SQLException {
+        String esType = getEsType(mapping, fieldName);
+
+        Object value = resultSet.getObject(columnName);
+        if (value instanceof Boolean) {
+            if (!"boolean".equals(esType)) {
+                value = resultSet.getByte(columnName);
+            }
+        }
+
+        // 如果是对象类型
+        if (mapping.getObjFields().containsKey(fieldName)) {
+            return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
+        } else {
+            return ESSyncUtil.typeConvert(value, esType);
+        }
+    }
+
+    public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,
+                                  Map<String, Object> esFieldData) throws SQLException {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id(); // TODO == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
+
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = value;
+            }
+
+            if (!fieldItem.getFieldName().equals(mapping.get_id())
+                && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                esFieldData.put(fieldItem.getFieldName(), value);
+            }
+        }
+        return resultIdVal;
+    }
+
+    public Object getIdValFromRS(ESMapping mapping, ResultSet resultSet) throws SQLException {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id(); // TODO == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
+
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = value;
+                break;
+            }
+        }
+        return resultIdVal;
+    }
+
+    public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet, Map<String, Object> dmlOld,
+                                  Map<String, Object> esFieldData) throws SQLException {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id(); // TODO == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
+            }
+
+            for (ColumnItem columnItem : fieldItem.getColumnItems()) {
+                if (dmlOld.containsKey(columnItem.getColumnName())
+                    && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                    esFieldData.put(fieldItem.getFieldName(),
+                        getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
+                    break;
+                }
+            }
+        }
+        return resultIdVal;
+    }
+
+    public Object getValFromData(ESMapping mapping, Map<String, Object> dmlData, String fieldName, String columnName) {
+        String esType = getEsType(mapping, fieldName);
+        Object value = dmlData.get(columnName);
+        if (value instanceof Byte) {
+            if ("boolean".equals(esType)) {
+                value = ((Byte) value).intValue() != 0;
+            }
+        }
+
+        // 如果是对象类型
+        if (mapping.getObjFields().containsKey(fieldName)) {
+            return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
+        } else {
+            return ESSyncUtil.typeConvert(value, esType);
+        }
+    }
+
+    /**
+     * 将dml的data转换为es的data
+     *
+     * @param mapping 配置mapping
+     * @param dmlData dml data
+     * @param esFieldData es data
+     * @return 返回 id 值
+     */
+    public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData,
+                                       Map<String, Object> esFieldData) {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id(); // TODO == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
+            Object value = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
+
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = value;
+            }
+
+            if (!fieldItem.getFieldName().equals(mapping.get_id())
+                && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                esFieldData.put(fieldItem.getFieldName(), value);
+            }
+        }
+        return resultIdVal;
+    }
+
+    /**
+     * 将dml的data, old转换为es的data
+     *
+     * @param mapping 配置mapping
+     * @param dmlData dml data
+     * @param esFieldData es data
+     * @return 返回 id 值
+     */
+    public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData, Map<String, Object> dmlOld,
+                                       Map<String, Object> esFieldData) {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id(); // TODO == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
+
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
+            }
+
+            if (dmlOld.get(columnName) != null && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                esFieldData.put(fieldItem.getFieldName(),
+                    getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
+            }
+        }
+        return resultIdVal;
+    }
+
+    /**
+     * es 字段类型本地缓存
+     */
+    private static ConcurrentMap<String, Map<String, String>> esFieldTypes = new ConcurrentHashMap<>();
+
+    /**
+     * 获取es mapping中的属性类型
+     *
+     * @param mapping mapping配置
+     * @param fieldName 属性名
+     * @return 类型
+     */
+    @SuppressWarnings("unchecked")
+    private String getEsType(ESMapping mapping, String fieldName) {
+        String key = mapping.get_index() + "-" + mapping.get_type();
+        Map<String, String> fieldType = esFieldTypes.get(key);
+        if (fieldType == null) {
+            ImmutableOpenMap<String, MappingMetaData> mappings;
+            try {
+                mappings = transportClient.admin()
+                    .cluster()
+                    .prepareState()
+                    .execute()
+                    .actionGet()
+                    .getState()
+                    .getMetaData()
+                    .getIndices()
+                    .get(mapping.get_index())
+                    .getMappings();
+            } catch (NullPointerException e) {
+                throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
+            }
+            MappingMetaData mappingMetaData = mappings.get(mapping.get_type());
+            if (mappingMetaData == null) {
+                throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
+            }
+
+            fieldType = new LinkedHashMap<>();
+
+            Map<String, Object> sourceMap = mappingMetaData.getSourceAsMap();
+            Map<String, Object> esMapping = (Map<String, Object>) sourceMap.get("properties");
+            for (Map.Entry<String, Object> entry : esMapping.entrySet()) {
+                Map<String, Object> value = (Map<String, Object>) entry.getValue();
+                if (value.containsKey("properties")) {
+                    fieldType.put(entry.getKey(), "object");
+                } else {
+                    fieldType.put(entry.getKey(), (String) value.get("type"));
+                }
+            }
+            esFieldTypes.put(key, fieldType);
+        }
+
+        return fieldType.get(fieldName);
+    }
+}

+ 1 - 0
client-adapter/elasticsearch/src/main/resources/es/mytest_user.yml

@@ -5,6 +5,7 @@ esMapping:
   _index: mytest_user
   _type: _doc
   _id: _id
+  upsert: true
 #  pk: id
   sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
         a.c_time as _c_time from user a