ymz пре 1 година
родитељ
комит
63dfbf65df

+ 94 - 0
client-adapter/es8x/pom.xml

@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>canal.client-adapter</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.1.7-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>client-adapter.es8x</artifactId>
+    <packaging>jar</packaging>
+    <name>canal client adapter es v8x module for otter ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.escore</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>co.elastic.clients</groupId>
+            <artifactId>elasticsearch-java</artifactId>
+            <version>8.6.2</version>
+        </dependency>
+        <!--<dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-client</artifactId>
+            <version>8.6.2</version>
+        </dependency>-->
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
+            <version>7.17.9</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <tasks>
+                                <copy todir="${project.basedir}/../launcher/target/classes/es8" overwrite="true">
+                                    <fileset dir="${project.basedir}/target/classes/es8" erroronmissingdir="true">
+                                        <include name="*.yml"/>
+                                    </fileset>
+                                </copy>
+                            </tasks>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 117 - 0
client-adapter/es8x/src/main/java/com/alibaba/otter/canal/client/adapter/es8x/ES8xAdapter.java

@@ -0,0 +1,117 @@
+package com.alibaba.otter.canal.client.adapter.es8x;
+
+import com.alibaba.otter.canal.client.adapter.es.core.ESAdapter;
+import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.es8x.etl.ESEtlService;
+import com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate;
+import com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+import org.elasticsearch.action.search.SearchResponse;
+
+import javax.sql.DataSource;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * ES 8.x 外部适配器
+ *
+ * @author ymz 2013-02-23
+ * @version 1.0.0
+ */
+@SPI("es8")
+public class ES8xAdapter extends ESAdapter {
+
+    private ESConnection esConnection;
+
+    public ESConnection getEsConnection() {
+        return esConnection;
+    }
+
+    @Override
+    public void init(OuterAdapterConfig configuration, Properties envProperties) {
+        try {
+            Map<String, String> properties = configuration.getProperties();
+
+            String[] hostArray = configuration.getHosts().split(",");
+            esConnection = new ESConnection(hostArray, properties);
+
+            this.esTemplate = new ES8xTemplate(esConnection);
+
+            envProperties.put("es.version", "es8");
+            super.init(configuration, envProperties);
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Map<String, Object> count(String task) {
+        ESSyncConfig config = esSyncConfig.get(task);
+        ESSyncConfig.ESMapping mapping = config.getEsMapping();
+        SearchResponse response = this.esConnection.new ESSearchRequest(mapping.get_index()).size(0).getResponse();
+
+        long rowCount = response.getHits().getTotalHits().value;
+        Map<String, Object> res = new LinkedHashMap<>();
+        res.put("esIndex", mapping.get_index());
+        res.put("count", rowCount);
+        return res;
+    }
+
+    @Override
+    public EtlResult etl(String task, List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        ESSyncConfig config = esSyncConfig.get(task);
+        if (config != null) {
+            DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+            ESEtlService esEtlService = new ESEtlService(esConnection, config);
+            if (dataSource != null) {
+                return esEtlService.importData(params);
+            } else {
+                etlResult.setSucceeded(false);
+                etlResult.setErrorMessage("DataSource not found");
+                return etlResult;
+            }
+        } else {
+            StringBuilder resultMsg = new StringBuilder();
+            boolean resSuccess = true;
+            for (ESSyncConfig configTmp : esSyncConfig.values()) {
+                // 取所有的destination为task的配置
+                if (configTmp.getDestination().equals(task)) {
+                    ESEtlService esEtlService = new ESEtlService(esConnection, configTmp);
+                    EtlResult etlRes = esEtlService.importData(params);
+                    if (!etlRes.getSucceeded()) {
+                        resSuccess = false;
+                        resultMsg.append(etlRes.getErrorMessage()).append("\n");
+                    } else {
+                        resultMsg.append(etlRes.getResultMessage()).append("\n");
+                    }
+                }
+            }
+            if (resultMsg.length() > 0) {
+                etlResult.setSucceeded(resSuccess);
+                if (resSuccess) {
+                    etlResult.setResultMessage(resultMsg.toString());
+                } else {
+                    etlResult.setErrorMessage(resultMsg.toString());
+                }
+                return etlResult;
+            }
+        }
+        etlResult.setSucceeded(false);
+        etlResult.setErrorMessage("Task not found");
+        return etlResult;
+    }
+
+    @Override
+    public void destroy() {
+        super.destroy();
+        if (esConnection != null) {
+            esConnection.close();
+        }
+    }
+}

+ 200 - 0
client-adapter/es8x/src/main/java/com/alibaba/otter/canal/client/adapter/es8x/etl/ESEtlService.java

@@ -0,0 +1,200 @@
+package com.alibaba.otter.canal.client.adapter.es8x.etl;
+
+import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.ESMapping;
+import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.FieldItem;
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESBulkResponse;
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESIndexRequest;
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESUpdateRequest;
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESTemplate;
+import com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate;
+import com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection;
+import com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection.ESSearchRequest;
+import com.alibaba.otter.canal.client.adapter.support.AbstractEtlService;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import org.apache.commons.lang.StringUtils;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * ES ETL Service
+ *
+ * @author rewerma 2018-11-01
+ * @version 1.0.0
+ */
+public class ESEtlService extends AbstractEtlService {
+
+    private ESConnection esConnection;
+    private ESTemplate esTemplate;
+    private ESSyncConfig config;
+
+    public ESEtlService(ESConnection esConnection, ESSyncConfig config) {
+        super("ES", config);
+        this.esConnection = esConnection;
+        this.esTemplate = new ES8xTemplate(esConnection);
+        this.config = config;
+    }
+
+    public EtlResult importData(List<String> params) {
+        ESMapping mapping = config.getEsMapping();
+        logger.info("start etl to import data to index: {}", mapping.get_index());
+        String sql = mapping.getSql();
+        return importData(sql, params);
+    }
+
+    protected boolean executeSqlImport(DataSource ds, String sql, List<Object> values,
+                                       AdapterConfig.AdapterMapping adapterMapping, AtomicLong impCount,
+                                       List<String> errMsg) {
+        try {
+            ESMapping mapping = (ESMapping) adapterMapping;
+            Util.sqlRS(ds, sql, values, rs -> {
+                int count = 0;
+                try {
+                    ESBulkRequest esBulkRequest = this.esConnection.new ES8xBulkRequest();
+
+                    long batchBegin = System.currentTimeMillis();
+                    while (rs.next()) {
+                        Map<String, Object> esFieldData = new LinkedHashMap<>();
+                        Object idVal = null;
+                        for (FieldItem fieldItem : mapping.getSchemaItem().getSelectFields().values()) {
+
+                            String fieldName = fieldItem.getFieldName();
+                            if (mapping.getSkips().contains(fieldName)) {
+                                continue;
+                            }
+
+                            // 如果是主键字段则不插入
+                            if (fieldItem.getFieldName().equals(mapping.get_id())) {
+                                idVal = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
+                            } else {
+                                Object val = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
+                                esFieldData.put(Util.cleanColumn(fieldName), val);
+                            }
+
+                        }
+
+                        if (!mapping.getRelations().isEmpty()) {
+                            mapping.getRelations().forEach((relationField, relationMapping) -> {
+                                Map<String, Object> relations = new HashMap<>();
+                                relations.put("name", relationMapping.getName());
+                                if (StringUtils.isNotEmpty(relationMapping.getParent())) {
+                                    FieldItem parentFieldItem = mapping.getSchemaItem()
+                                            .getSelectFields()
+                                            .get(relationMapping.getParent());
+                                    Object parentVal;
+                                    try {
+                                        parentVal = esTemplate.getValFromRS(mapping,
+                                                rs,
+                                                parentFieldItem.getFieldName(),
+                                                parentFieldItem.getFieldName());
+                                    } catch (SQLException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                    if (parentVal != null) {
+                                        relations.put("parent", parentVal.toString());
+                                        esFieldData.put("$parent_routing", parentVal.toString());
+
+                                    }
+                                }
+                                esFieldData.put(Util.cleanColumn(relationField), relations);
+                            });
+                        }
+
+                        if (idVal != null) {
+                            String parentVal = (String) esFieldData.remove("$parent_routing");
+                            if (mapping.isUpsert()) {
+                                ESUpdateRequest esUpdateRequest = this.esConnection.new ES8xUpdateRequest(
+                                        mapping.get_index(),
+                                        idVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
+
+                                if (StringUtils.isNotEmpty(parentVal)) {
+                                    esUpdateRequest.setRouting(parentVal);
+                                }
+
+                                esBulkRequest.add(esUpdateRequest);
+                            } else {
+                                ESIndexRequest esIndexRequest = this.esConnection.new ES8xIndexRequest(
+                                        mapping.get_index(),
+                                        idVal.toString()).setSource(esFieldData);
+                                if (StringUtils.isNotEmpty(parentVal)) {
+                                    esIndexRequest.setRouting(parentVal);
+                                }
+                                esBulkRequest.add(esIndexRequest);
+                            }
+                        } else {
+                            idVal = esFieldData.get(mapping.getPk());
+                            ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
+                                    .setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))
+                                    .size(10000);
+                            SearchResponse response = esSearchRequest.getResponse();
+                            for (SearchHit hit : response.getHits()) {
+                                ESUpdateRequest esUpdateRequest = this.esConnection.new ES8xUpdateRequest(
+                                        mapping.get_index(),
+                                        hit.getId()).setDoc(esFieldData);
+                                esBulkRequest.add(esUpdateRequest);
+                            }
+                        }
+
+                        if (esBulkRequest.numberOfActions() % mapping.getCommitBatch() == 0
+                                && esBulkRequest.numberOfActions() > 0) {
+                            long esBatchBegin = System.currentTimeMillis();
+                            ESBulkResponse rp = esBulkRequest.bulk();
+                            if (rp.hasFailures()) {
+                                rp.processFailBulkResponse("全量数据 etl 异常 ");
+                            }
+
+                            if (logger.isTraceEnabled()) {
+                                logger.trace("全量数据批量导入批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
+                                        (System.currentTimeMillis() - batchBegin),
+                                        (System.currentTimeMillis() - esBatchBegin),
+                                        esBulkRequest.numberOfActions(),
+                                        mapping.get_index());
+                            }
+                            batchBegin = System.currentTimeMillis();
+                            esBulkRequest.resetBulk();
+                        }
+                        count++;
+                        impCount.incrementAndGet();
+                    }
+
+                    if (esBulkRequest.numberOfActions() > 0) {
+                        long esBatchBegin = System.currentTimeMillis();
+                        ESBulkResponse rp = esBulkRequest.bulk();
+                        if (rp.hasFailures()) {
+                            rp.processFailBulkResponse("全量数据 etl 异常 ");
+                        }
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("全量数据批量导入最后批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
+                                    (System.currentTimeMillis() - batchBegin),
+                                    (System.currentTimeMillis() - esBatchBegin),
+                                    esBulkRequest.numberOfActions(),
+                                    mapping.get_index());
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                    errMsg.add(mapping.get_index() + " etl failed! ==>" + e.getMessage());
+                    throw new RuntimeException(e);
+                }
+                return count;
+            });
+
+            return true;
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+    }
+}

+ 463 - 0
client-adapter/es8x/src/main/java/com/alibaba/otter/canal/client/adapter/es8x/support/ES8xTemplate.java

@@ -0,0 +1,463 @@
+package com.alibaba.otter.canal.client.adapter.es8x.support;
+
+import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.ESMapping;
+import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem;
+import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.ColumnItem;
+import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem.FieldItem;
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESBulkResponse;
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESDeleteRequest;
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESIndexRequest;
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest.ESUpdateRequest;
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESSyncUtil;
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESTemplate;
+import com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection.ESSearchRequest;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import org.apache.commons.lang.StringUtils;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class ES8xTemplate implements ESTemplate {
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(ESTemplate.class);
+
+    private static final int MAX_BATCH_SIZE = 1000;
+
+    private ESConnection esConnection;
+
+    private ESBulkRequest esBulkRequest;
+
+    // es 字段类型本地缓存
+    private static ConcurrentMap<String, Map<String, String>> esFieldTypes = new ConcurrentHashMap<>();
+
+    public ES8xTemplate(ESConnection esConnection) {
+        this.esConnection = esConnection;
+        this.esBulkRequest = this.esConnection.new ES8xBulkRequest();
+    }
+
+    public ESBulkRequest getBulk() {
+        return esBulkRequest;
+    }
+
+    public void resetBulkRequestBuilder() {
+        this.esBulkRequest.resetBulk();
+    }
+
+    @Override
+    public void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
+        if (mapping.get_id() != null) {
+            String parentVal = (String) esFieldData.remove("$parent_routing");
+            if (mapping.isUpsert()) {
+                ESUpdateRequest updateRequest = esConnection.new ES8xUpdateRequest(mapping.get_index(),
+                        pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
+                if (StringUtils.isNotEmpty(parentVal)) {
+                    updateRequest.setRouting(parentVal);
+                }
+                getBulk().add(updateRequest);
+            } else {
+                ESIndexRequest indexRequest = esConnection.new ES8xIndexRequest(mapping.get_index(), pkVal.toString())
+                        .setSource(esFieldData);
+                if (StringUtils.isNotEmpty(parentVal)) {
+                    indexRequest.setRouting(parentVal);
+                }
+                getBulk().add(indexRequest);
+            }
+            commitBulk();
+        } else {
+            ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
+                    .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+                    .size(10000);
+            SearchResponse response = esSearchRequest.getResponse();
+
+            for (SearchHit hit : response.getHits()) {
+                ESUpdateRequest esUpdateRequest = this.esConnection.new ES8xUpdateRequest(mapping.get_index(),
+                        hit.getId()).setDoc(esFieldData);
+                getBulk().add(esUpdateRequest);
+                commitBulk();
+            }
+        }
+    }
+
+    @Override
+    public void update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
+        Map<String, Object> esFieldDataTmp = new LinkedHashMap<>(esFieldData.size());
+        esFieldData.forEach((k, v) -> esFieldDataTmp.put(Util.cleanColumn(k), v));
+        append4Update(mapping, pkVal, esFieldDataTmp);
+        commitBulk();
+    }
+
+    @Override
+    public void updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {
+        if (paramsTmp.isEmpty()) {
+            return;
+        }
+        ESMapping mapping = config.getEsMapping();
+        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
+        paramsTmp.forEach((fieldName, value) -> queryBuilder.must(QueryBuilders.termsQuery(fieldName, value)));
+
+        // 查询sql批量更新
+        DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+        StringBuilder sql = new StringBuilder("SELECT * FROM (" + mapping.getSql() + ") _v WHERE ");
+        List<Object> values = new ArrayList<>();
+        paramsTmp.forEach((fieldName, value) -> {
+            sql.append("_v.").append(fieldName).append("=? AND ");
+            values.add(value);
+        });
+        // TODO 直接外部包裹sql会导致全表扫描性能低, 待优化拼接内部where条件
+        int len = sql.length();
+        sql.delete(len - 4, len);
+        Integer syncCount = (Integer) Util.sqlRS(ds, sql.toString(), values, rs -> {
+            int count = 0;
+            try {
+                while (rs.next()) {
+                    Object idVal = getIdValFromRS(mapping, rs);
+                    append4Update(mapping, idVal, esFieldData);
+                    commitBulk();
+                    count++;
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return count;
+        });
+        if (logger.isTraceEnabled()) {
+            logger.trace("Update ES by query affected {} records", syncCount);
+        }
+    }
+
+    @Override
+    public void delete(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
+        if (mapping.get_id() != null) {
+            ESDeleteRequest esDeleteRequest = this.esConnection.new ES8xDeleteRequest(mapping.get_index(),
+                    pkVal.toString());
+            getBulk().add(esDeleteRequest);
+            commitBulk();
+        } else {
+            ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
+                    .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+                    .size(10000);
+            SearchResponse response = esSearchRequest.getResponse();
+            for (SearchHit hit : response.getHits()) {
+                ESUpdateRequest esUpdateRequest = this.esConnection.new ES8xUpdateRequest(mapping.get_index(),
+                        hit.getId()).setDoc(esFieldData);
+                getBulk().add(esUpdateRequest);
+                commitBulk();
+            }
+        }
+    }
+
+    @Override
+    public void commit() {
+        if (getBulk().numberOfActions() > 0) {
+            ESBulkResponse response = getBulk().bulk();
+            if (response.hasFailures()) {
+                response.processFailBulkResponse("ES sync commit error ");
+            }
+            resetBulkRequestBuilder();
+        }
+    }
+
+    @Override
+    public Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName,
+                               String columnName) throws SQLException {
+        fieldName = Util.cleanColumn(fieldName);
+        columnName = Util.cleanColumn(columnName);
+        String esType = getEsType(mapping, fieldName);
+
+        Object value = resultSet.getObject(columnName);
+        if (value instanceof Boolean) {
+            if (!"boolean".equals(esType)) {
+                value = resultSet.getByte(columnName);
+            }
+        }
+
+        // 如果是对象类型
+        if (mapping.getObjFields().containsKey(fieldName)) {
+            return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
+        } else {
+            return ESSyncUtil.typeConvert(value, esType);
+        }
+    }
+
+    @Override
+    public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,
+                                  Map<String, Object> esFieldData) throws SQLException {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
+
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = value;
+            }
+
+            if (!fieldItem.getFieldName().equals(mapping.get_id())
+                    && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
+            }
+        }
+
+        // 添加父子文档关联信息
+        putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
+
+        return resultIdVal;
+    }
+
+    @Override
+    public Object getIdValFromRS(ESMapping mapping, ResultSet resultSet) throws SQLException {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
+
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = value;
+                break;
+            }
+        }
+        return resultIdVal;
+    }
+
+    @Override
+    public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet, Map<String, Object> dmlOld,
+                                  Map<String, Object> esFieldData) throws SQLException {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
+            }
+
+            for (ColumnItem columnItem : fieldItem.getColumnItems()) {
+                if (dmlOld.containsKey(columnItem.getColumnName())
+                        && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                    esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
+                            getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
+                    break;
+                }
+            }
+        }
+
+        // 添加父子文档关联信息
+        putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
+
+        return resultIdVal;
+    }
+
+    @Override
+    public Object getValFromData(ESMapping mapping, Map<String, Object> dmlData, String fieldName, String columnName) {
+        String esType = getEsType(mapping, fieldName);
+        Object value = dmlData.get(columnName);
+        if (value instanceof Byte) {
+            if ("boolean".equals(esType)) {
+                value = ((Byte) value).intValue() != 0;
+            }
+        }
+
+        // 如果是对象类型
+        if (mapping.getObjFields().containsKey(fieldName)) {
+            return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
+        } else {
+            return ESSyncUtil.typeConvert(value, esType);
+        }
+    }
+
+    @Override
+    public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData, Map<String, Object> esFieldData) {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
+            Object value = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
+
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = value;
+            }
+
+            if (!fieldItem.getFieldName().equals(mapping.get_id())
+                    && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
+            }
+        }
+
+        // 添加父子文档关联信息
+        putRelationData(mapping, schemaItem, dmlData, esFieldData);
+        return resultIdVal;
+    }
+
+    @Override
+    public Object getESDataFromDmlData(ESMapping mapping, String owner, Map<String, Object> dmlData, Map<String, Object> dmlOld,
+                                       Map<String, Object> esFieldData) {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            ColumnItem columnItem = fieldItem.getColumnItems().iterator().next();
+            if (!columnItem.getOwner().equals(owner)) {
+                continue;
+            }
+            String columnName = columnItem.getColumnName();
+
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
+            }
+
+            if (dmlOld.containsKey(columnName) && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
+                        getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
+            }
+        }
+
+        // 添加父子文档关联信息
+        putRelationData(mapping, schemaItem, dmlOld, esFieldData);
+        return resultIdVal;
+    }
+
+    /**
+     * 如果大于批量数则提交批次
+     */
+    private void commitBulk() {
+        if (getBulk().numberOfActions() >= MAX_BATCH_SIZE) {
+            commit();
+        }
+    }
+
+    private void append4Update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
+        if (mapping.get_id() != null) {
+            String parentVal = (String) esFieldData.remove("$parent_routing");
+            if (mapping.isUpsert()) {
+                ESUpdateRequest esUpdateRequest = this.esConnection.new ES8xUpdateRequest(mapping.get_index(),
+                        pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
+                if (StringUtils.isNotEmpty(parentVal)) {
+                    esUpdateRequest.setRouting(parentVal);
+                }
+                getBulk().add(esUpdateRequest);
+            } else {
+                ESUpdateRequest esUpdateRequest = this.esConnection.new ES8xUpdateRequest(mapping.get_index(),
+                        pkVal.toString()).setDoc(esFieldData);
+                if (StringUtils.isNotEmpty(parentVal)) {
+                    esUpdateRequest.setRouting(parentVal);
+                }
+                getBulk().add(esUpdateRequest);
+            }
+        } else {
+            ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
+                    .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+                    .size(10000);
+            SearchResponse response = esSearchRequest.getResponse();
+            for (SearchHit hit : response.getHits()) {
+                ESUpdateRequest esUpdateRequest = this.esConnection.new ES8xUpdateRequest(mapping.get_index(),
+                        hit.getId()).setDoc(esFieldData);
+                getBulk().add(esUpdateRequest);
+            }
+        }
+    }
+
+    /**
+     * 获取es mapping中的属性类型
+     *
+     * @param mapping   mapping配置
+     * @param fieldName 属性名
+     * @return 类型
+     */
+    @SuppressWarnings("unchecked")
+    private String getEsType(ESMapping mapping, String fieldName) {
+        String key = mapping.get_index() + "-" + mapping.get_type();
+        Map<String, String> fieldType = esFieldTypes.get(key);
+        if (fieldType != null) {
+            return fieldType.get(fieldName);
+        } else {
+            MappingMetadata mappingMetaData = esConnection.getMapping(mapping.get_index());
+
+            if (mappingMetaData == null) {
+                throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
+            }
+
+            fieldType = new LinkedHashMap<>();
+
+            Map<String, Object> sourceMap = mappingMetaData.getSourceAsMap();
+            Map<String, Object> esMapping = (Map<String, Object>) sourceMap.get("properties");
+            for (Map.Entry<String, Object> entry : esMapping.entrySet()) {
+                Map<String, Object> value = (Map<String, Object>) entry.getValue();
+                if (value.containsKey("properties")) {
+                    fieldType.put(entry.getKey(), "object");
+                } else {
+                    fieldType.put(entry.getKey(), (String) value.get("type"));
+                }
+            }
+            esFieldTypes.put(key, fieldType);
+
+            return fieldType.get(fieldName);
+        }
+    }
+
+    private void putRelationDataFromRS(ESMapping mapping, SchemaItem schemaItem, ResultSet resultSet,
+                                       Map<String, Object> esFieldData) {
+        // 添加父子文档关联信息
+        if (!mapping.getRelations().isEmpty()) {
+            mapping.getRelations().forEach((relationField, relationMapping) -> {
+                Map<String, Object> relations = new HashMap<>();
+                relations.put("name", relationMapping.getName());
+                if (StringUtils.isNotEmpty(relationMapping.getParent())) {
+                    FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());
+                    Object parentVal;
+                    try {
+                        parentVal = getValFromRS(mapping,
+                                resultSet,
+                                parentFieldItem.getFieldName(),
+                                parentFieldItem.getFieldName());
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    }
+                    if (parentVal != null) {
+                        relations.put("parent", parentVal.toString());
+                        esFieldData.put("$parent_routing", parentVal.toString());
+
+                    }
+                }
+                esFieldData.put(relationField, relations);
+            });
+        }
+    }
+
+    private void putRelationData(ESMapping mapping, SchemaItem schemaItem, Map<String, Object> dmlData,
+                                 Map<String, Object> esFieldData) {
+        // 添加父子文档关联信息
+        if (!mapping.getRelations().isEmpty()) {
+            mapping.getRelations().forEach((relationField, relationMapping) -> {
+                Map<String, Object> relations = new HashMap<>();
+                relations.put("name", relationMapping.getName());
+                if (StringUtils.isNotEmpty(relationMapping.getParent())) {
+                    FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());
+                    String columnName = parentFieldItem.getColumnItems().iterator().next().getColumnName();
+                    Object parentVal = getValFromData(mapping, dmlData, parentFieldItem.getFieldName(), columnName);
+                    if (parentVal != null) {
+                        relations.put("parent", parentVal.toString());
+                        esFieldData.put("$parent_routing", parentVal.toString());
+
+                    }
+                }
+                esFieldData.put(relationField, relations);
+            });
+        }
+    }
+}

+ 395 - 0
client-adapter/es8x/src/main/java/com/alibaba/otter/canal/client/adapter/es8x/support/ESConnection.java

@@ -0,0 +1,395 @@
+package com.alibaba.otter.canal.client.adapter.es8x.support;
+
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.client.*;
+import org.elasticsearch.client.indices.GetMappingsRequest;
+import org.elasticsearch.client.indices.GetMappingsResponse;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * ES 连接器, 只支持 Rest 方式
+ *
+ * @author ymz 2023-03-02
+ * @version 1.0.0
+ */
+public class ESConnection {
+
+    private static final Logger logger = LoggerFactory.getLogger(ESConnection.class);
+
+
+    private RestHighLevelClient restHighLevelClient;
+
+    public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException {
+        HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
+        String nameAndPwd = properties.get("security.auth");
+        if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
+            String[] nameAndPwdArr = nameAndPwd.split(":");
+            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
+            restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+        }
+        restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true).build();
+    }
+
+    public void close() {
+        try {
+            restHighLevelClient.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public MappingMetadata getMapping(String index) {
+        MappingMetadata mappingMetaData = null;
+
+        Map<String, MappingMetadata> mappings;
+        try {
+            GetMappingsRequest request = new GetMappingsRequest();
+            request.indices(index);
+            GetMappingsResponse response = restHighLevelClient.indices().getMapping(request, RequestOptions.DEFAULT);
+
+            mappings = response.mappings();
+        } catch (NullPointerException e) {
+            throw new IllegalArgumentException("Not found the mapping info of index: " + index);
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+            return null;
+        }
+        mappingMetaData = mappings.get(index);
+
+        return mappingMetaData;
+    }
+
+    public class ES8xIndexRequest implements ESBulkRequest.ESIndexRequest {
+
+        private IndexRequestBuilder indexRequestBuilder;
+
+        private IndexRequest indexRequest;
+
+        public ES8xIndexRequest(String index, String id) {
+            indexRequest = new IndexRequest(index);
+            indexRequest.id(id);
+
+        }
+
+        public ES8xIndexRequest setSource(Map<String, ?> source) {
+
+            indexRequest.source(source);
+
+            return this;
+        }
+
+        public ES8xIndexRequest setRouting(String routing) {
+
+            indexRequest.routing(routing);
+
+            return this;
+        }
+
+        public IndexRequestBuilder getIndexRequestBuilder() {
+            return indexRequestBuilder;
+        }
+
+        public void setIndexRequestBuilder(IndexRequestBuilder indexRequestBuilder) {
+            this.indexRequestBuilder = indexRequestBuilder;
+        }
+
+        public IndexRequest getIndexRequest() {
+            return indexRequest;
+        }
+
+        public void setIndexRequest(IndexRequest indexRequest) {
+            this.indexRequest = indexRequest;
+        }
+    }
+
+    public class ES8xUpdateRequest implements ESBulkRequest.ESUpdateRequest {
+
+        private UpdateRequestBuilder updateRequestBuilder;
+
+        private UpdateRequest updateRequest;
+
+        public ES8xUpdateRequest(String index, String id) {
+
+            updateRequest = new UpdateRequest(index, id);
+        }
+
+        public ES8xUpdateRequest setDoc(Map source) {
+
+            updateRequest.doc(source);
+
+            return this;
+        }
+
+        public ES8xUpdateRequest setDocAsUpsert(boolean shouldUpsertDoc) {
+
+            updateRequest.docAsUpsert(shouldUpsertDoc);
+
+            return this;
+        }
+
+        public ES8xUpdateRequest setRouting(String routing) {
+
+            updateRequest.routing(routing);
+
+            return this;
+        }
+
+        public UpdateRequestBuilder getUpdateRequestBuilder() {
+            return updateRequestBuilder;
+        }
+
+        public void setUpdateRequestBuilder(UpdateRequestBuilder updateRequestBuilder) {
+            this.updateRequestBuilder = updateRequestBuilder;
+        }
+
+        public UpdateRequest getUpdateRequest() {
+            return updateRequest;
+        }
+
+        public void setUpdateRequest(UpdateRequest updateRequest) {
+            this.updateRequest = updateRequest;
+        }
+    }
+
+    public class ES8xDeleteRequest implements ESBulkRequest.ESDeleteRequest {
+
+        private DeleteRequestBuilder deleteRequestBuilder;
+
+        private DeleteRequest deleteRequest;
+
+        public ES8xDeleteRequest(String index, String id) {
+
+            deleteRequest = new DeleteRequest(index, id);
+
+        }
+
+        public DeleteRequestBuilder getDeleteRequestBuilder() {
+            return deleteRequestBuilder;
+        }
+
+        public void setDeleteRequestBuilder(DeleteRequestBuilder deleteRequestBuilder) {
+            this.deleteRequestBuilder = deleteRequestBuilder;
+        }
+
+        public DeleteRequest getDeleteRequest() {
+            return deleteRequest;
+        }
+
+        public void setDeleteRequest(DeleteRequest deleteRequest) {
+            this.deleteRequest = deleteRequest;
+        }
+    }
+
+    public class ESSearchRequest {
+
+        private SearchRequestBuilder searchRequestBuilder;
+
+        private SearchRequest searchRequest;
+
+        private SearchSourceBuilder sourceBuilder;
+
+        public ESSearchRequest(String index) {
+
+            searchRequest = new SearchRequest(index);
+            sourceBuilder = new SearchSourceBuilder();
+
+        }
+
+        public ESSearchRequest setQuery(QueryBuilder queryBuilder) {
+
+            sourceBuilder.query(queryBuilder);
+
+            return this;
+        }
+
+        public ESSearchRequest size(int size) {
+
+            sourceBuilder.size(size);
+
+            return this;
+        }
+
+        public SearchResponse getResponse() {
+
+            searchRequest.source(sourceBuilder);
+            try {
+                return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+        }
+
+        public SearchRequestBuilder getSearchRequestBuilder() {
+            return searchRequestBuilder;
+        }
+
+        public void setSearchRequestBuilder(SearchRequestBuilder searchRequestBuilder) {
+            this.searchRequestBuilder = searchRequestBuilder;
+        }
+
+        public SearchRequest getSearchRequest() {
+            return searchRequest;
+        }
+
+        public void setSearchRequest(SearchRequest searchRequest) {
+            this.searchRequest = searchRequest;
+        }
+    }
+
+    public class ES8xBulkRequest implements ESBulkRequest {
+
+        private BulkRequestBuilder bulkRequestBuilder;
+
+        private BulkRequest bulkRequest;
+
+        public ES8xBulkRequest() {
+
+            bulkRequest = new BulkRequest();
+
+        }
+
+        public void resetBulk() {
+
+            bulkRequest = new BulkRequest();
+
+        }
+
+        public ES8xBulkRequest add(ESIndexRequest esIndexRequest) {
+            ES8xIndexRequest eir = (ES8xIndexRequest) esIndexRequest;
+
+            bulkRequest.add(eir.indexRequest);
+
+            return this;
+        }
+
+        public ES8xBulkRequest add(ESUpdateRequest esUpdateRequest) {
+            ES8xUpdateRequest eur = (ES8xUpdateRequest) esUpdateRequest;
+
+            bulkRequest.add(eur.updateRequest);
+
+            return this;
+        }
+
+        public ES8xBulkRequest add(ESDeleteRequest esDeleteRequest) {
+            ES8xDeleteRequest edr = (ES8xDeleteRequest) esDeleteRequest;
+
+            bulkRequest.add(edr.deleteRequest);
+
+            return this;
+        }
+
+        public int numberOfActions() {
+            return bulkRequest.numberOfActions();
+        }
+
+        public ESBulkResponse bulk() {
+            try {
+                BulkResponse responses = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
+                return new ES8xBulkResponse(responses);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+        }
+
+        public BulkRequestBuilder getBulkRequestBuilder() {
+            return bulkRequestBuilder;
+        }
+
+        public void setBulkRequestBuilder(BulkRequestBuilder bulkRequestBuilder) {
+            this.bulkRequestBuilder = bulkRequestBuilder;
+        }
+
+        public BulkRequest getBulkRequest() {
+            return bulkRequest;
+        }
+
+        public void setBulkRequest(BulkRequest bulkRequest) {
+            this.bulkRequest = bulkRequest;
+        }
+    }
+
+    public static class ES8xBulkResponse implements ESBulkRequest.ESBulkResponse {
+
+        private BulkResponse bulkResponse;
+
+        public ES8xBulkResponse(BulkResponse bulkResponse) {
+            this.bulkResponse = bulkResponse;
+        }
+
+        @Override
+        public boolean hasFailures() {
+            return bulkResponse.hasFailures();
+        }
+
+        @Override
+        public void processFailBulkResponse(String errorMsg) {
+            for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
+                if (!itemResponse.isFailed()) {
+                    continue;
+                }
+
+                if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
+                    logger.error(itemResponse.getFailureMessage());
+                } else {
+                    throw new RuntimeException(errorMsg + itemResponse.getFailureMessage());
+                }
+            }
+        }
+    }
+
+
+    public RestHighLevelClient getRestHighLevelClient() {
+        return restHighLevelClient;
+    }
+
+    public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
+        this.restHighLevelClient = restHighLevelClient;
+    }
+
+    private HttpHost createHttpHost(String uriStr) {
+        URI uri = URI.create(uriStr);
+        if (!org.springframework.util.StringUtils.hasLength(uri.getUserInfo())) {
+            return HttpHost.create(uri.toString());
+        }
+        try {
+            return HttpHost.create(new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment()).toString());
+        } catch (URISyntaxException ex) {
+            throw new IllegalStateException(ex);
+        }
+    }
+}

+ 1 - 0
client-adapter/es8x/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

@@ -0,0 +1 @@
+es8=com.alibaba.otter.canal.client.adapter.es8x.ES8xAdapter

+ 20 - 0
client-adapter/es8x/src/main/resources/es8/biz_order.yml

@@ -0,0 +1,20 @@
+dataSourceKey: defaultDS
+destination: example
+groupId: g1
+esMapping:
+  _index: customer
+  _id: _id
+  relations:
+    customer_order:
+      name: order
+      parent: customer_id
+  sql: "select concat('oid_', t.id) as _id,
+        t.customer_id,
+        t.id as order_id,
+        t.serial_code as order_serial,
+        t.c_time as order_time
+        from biz_order t"
+  skips:
+    - customer_id
+  etlCondition: "where t.c_time>={}"
+  commitBatch: 3000

+ 46 - 0
client-adapter/es8x/src/main/resources/es8/customer.yml

@@ -0,0 +1,46 @@
+dataSourceKey: defaultDS
+destination: example
+groupId: g1
+esMapping:
+  _index: customer
+  _id: id
+  relations:
+    customer_order:
+      name: customer
+  sql: "select t.id, t.name, t.email from customer t"
+  etlCondition: "where t.c_time>={}"
+  commitBatch: 3000
+
+
+#{
+#  "mappings":{
+#    "_doc":{
+#      "properties":{
+#        "id": {
+#          "type": "long"
+#        },
+#        "name": {
+#          "type": "text"
+#        },
+#        "email": {
+#          "type": "text"
+#        },
+#        "order_id": {
+#          "type": "long"
+#        },
+#        "order_serial": {
+#          "type": "text"
+#        },
+#        "order_time": {
+#          "type": "date"
+#        },
+#        "customer_order":{
+#          "type":"join",
+#          "relations":{
+#            "customer":"order"
+#          }
+#        }
+#      }
+#    }
+#  }
+#}

+ 15 - 0
client-adapter/es8x/src/main/resources/es8/mytest_user.yml

@@ -0,0 +1,15 @@
+dataSourceKey: defaultDS
+destination: example
+groupId: g1
+esMapping:
+  _index: mytest_user
+  _id: _id
+  #  upsert: true
+  #  pk: id
+  sql: "select a.id as _id, a.name, a.role_id, b.role_name,
+        a.c_time from user a
+        left join role b on b.id=a.role_id"
+  #  objFields:
+  #    _labels: array:;
+  etlCondition: "where a.c_time>={}"
+  commitBatch: 3000

+ 44 - 0
client-adapter/es8x/src/test/java/com/alibaba/otter/canal/client/adapter/es8x/test/ESConnectionTest.java

@@ -0,0 +1,44 @@
+package com.alibaba.otter.canal.client.adapter.es8x.test;
+
+import com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.springframework.util.Assert;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+@Ignore
+public class ESConnectionTest {
+
+    ESConnection esConnection;
+
+    @Before
+    public void init() throws UnknownHostException {
+        String[] hosts = new String[]{"127.0.0.1:9200"};
+        Map<String, String> properties = new HashMap<>();
+        properties.put("cluster.name", "elasticsearch");
+        esConnection = new ESConnection(hosts, properties);
+    }
+
+    @Test
+    public void test01() {
+        MappingMetadata mappingMetaData = esConnection.getMapping("mytest_user");
+
+        Map<String, Object> sourceMap = mappingMetaData.getSourceAsMap();
+        Map<String, Object> esMapping = (Map<String, Object>) sourceMap.get("properties");
+        for (Map.Entry<String, Object> entry : esMapping.entrySet()) {
+            Map<String, Object> value = (Map<String, Object>) entry.getValue();
+            if (value.containsKey("properties")) {
+                System.out.println(entry.getKey() + " object");
+            } else {
+                System.out.println(entry.getKey() + " " + value.get("type"));
+                Assert.notNull(entry.getKey(), "null column name");
+                Assert.notNull(value.get("type"), "null column type");
+            }
+        }
+    }
+}

+ 40 - 0
client-adapter/es8x/src/test/java/com/alibaba/otter/canal/client/adapter/es8x/test/TestConstant.java

@@ -0,0 +1,40 @@
+package com.alibaba.otter.canal.client.adapter.es8x.test;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+import java.sql.SQLException;
+
+public class TestConstant {
+
+    public final static String jdbcUrl = "jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true";
+    public final static String jdbcUser = "root";
+    public final static String jdbcPassword = "121212";
+
+    public final static String esHosts = "127.0.0.1:9300";
+    public final static String clusterName = "elasticsearch";
+
+    public final static DruidDataSource dataSource;
+
+    static {
+        dataSource = new DruidDataSource();
+        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
+        dataSource.setUrl(jdbcUrl);
+        dataSource.setUsername(jdbcUser);
+        dataSource.setPassword(jdbcPassword);
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(1);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+        dataSource.setPoolPreparedStatements(false);
+        dataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
+        dataSource.setValidationQuery("select 1");
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 13 - 0
client-adapter/launcher/pom.xml

@@ -128,6 +128,19 @@
             <classifier>jar-with-dependencies</classifier>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.es8x</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>*</artifactId>
+                    <groupId>*</groupId>
+                </exclusion>
+            </exclusions>
+            <classifier>jar-with-dependencies</classifier>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>com.alibaba.otter</groupId>
             <artifactId>client-adapter.rdb</artifactId>

+ 7 - 0
client-adapter/launcher/src/main/assembly/dev.xml

@@ -43,6 +43,13 @@
                 <include>**/*</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>../es8x/src/main/resources/es8</directory>
+            <outputDirectory>/conf/es8</outputDirectory>
+            <includes>
+                <include>**/*</include>
+            </includes>
+        </fileSet>
         <fileSet>
             <directory>../hbase/src/main/resources/hbase</directory>
             <outputDirectory>/conf/hbase</outputDirectory>

+ 7 - 0
client-adapter/launcher/src/main/assembly/release.xml

@@ -44,6 +44,13 @@
                 <include>**/*</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>../es8x/src/main/resources/es8</directory>
+            <outputDirectory>/conf/es8</outputDirectory>
+            <includes>
+                <include>**/*</include>
+            </includes>
+        </fileSet>
         <fileSet>
             <directory>../hbase/src/main/resources/hbase</directory>
             <outputDirectory>/conf/hbase</outputDirectory>

+ 1 - 0
client-adapter/pom.xml

@@ -31,6 +31,7 @@
         <module>rdb</module>
         <module>es6x</module>
         <module>es7x</module>
+        <module>es8x</module>
         <module>escore</module>
         <module>kudu</module>
         <module>phoenix</module>