Ver Fonte

新增es adapter rest client和认证机制 (#2046)

* 新增es adapter rest client和认证机制

* 修改getMapping位置

* 代码调整

* 代码调整

* 整理代码

* fix #2048
rewerma há 5 anos atrás
pai
commit
fdd0763567
16 ficheiros alterados com 1012 adições e 441 exclusões
  1. 12 2
      client-adapter/elasticsearch/pom.xml
  2. 24 27
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java
  3. 39 38
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java
  4. 468 0
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESConnection.java
  5. 88 101
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java
  6. 30 0
      client-adapter/elasticsearch/src/main/java/org/elasticsearch/client/RequestConvertersExt.java
  7. 29 0
      client-adapter/elasticsearch/src/main/java/org/elasticsearch/client/RestHighLevelClientExt.java
  8. 13 3
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSub2Test.java
  9. 13 3
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSubTest.java
  10. 9 2
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOne2Test.java
  11. 6 5
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java
  12. 9 2
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java
  13. 13 3
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java
  14. 251 251
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/DbRemoteConfigLoader.java
  15. 7 4
      client-adapter/launcher/src/main/resources/application.yml
  16. 1 0
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

+ 12 - 2
client-adapter/elasticsearch/pom.xml

@@ -26,12 +26,22 @@
         <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
-            <version>6.2.3</version>
+            <version>6.4.3</version>
         </dependency>
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>transport</artifactId>
-            <version>6.2.3</version>
+            <version>6.4.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-client</artifactId>
+            <version>6.4.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
+            <version>6.4.3</version>
         </dependency>
 
         <dependency>

+ 24 - 27
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.client.adapter.es;
 
-import java.net.InetAddress;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -13,10 +12,6 @@ import javax.sql.DataSource;
 
 import org.apache.commons.lang.StringUtils;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
 
 import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -28,6 +23,7 @@ import com.alibaba.otter.canal.client.adapter.es.config.SqlParser;
 import com.alibaba.otter.canal.client.adapter.es.monitor.ESConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.es.service.ESEtlService;
 import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
+import com.alibaba.otter.canal.client.adapter.es.support.ESConnection;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.support.*;
 
@@ -43,7 +39,7 @@ public class ESAdapter implements OuterAdapter {
     private Map<String, ESSyncConfig>              esSyncConfig        = new ConcurrentHashMap<>(); // 文件名对应配置
     private Map<String, Map<String, ESSyncConfig>> dbTableEsSyncConfig = new ConcurrentHashMap<>(); // schema-table对应配置
 
-    private TransportClient                        transportClient;
+    private ESConnection                           esConnection;
 
     private ESSyncService                          esSyncService;
 
@@ -51,10 +47,6 @@ public class ESAdapter implements OuterAdapter {
 
     private Properties                             envProperties;
 
-    public TransportClient getTransportClient() {
-        return transportClient;
-    }
-
     public ESSyncService getEsSyncService() {
         return esSyncService;
     }
@@ -67,6 +59,14 @@ public class ESAdapter implements OuterAdapter {
         return dbTableEsSyncConfig;
     }
 
+    public ESConnection getEsConnection() {
+        return esConnection;
+    }
+
+    public void setEsConnection(ESConnection esConnection) {
+        this.esConnection = esConnection;
+    }
+
     @Override
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
         try {
@@ -119,17 +119,16 @@ public class ESAdapter implements OuterAdapter {
             }
 
             Map<String, String> properties = configuration.getProperties();
-            Settings.Builder settingBuilder = Settings.builder();
-            properties.forEach(settingBuilder::put);
-            Settings settings = settingBuilder.build();
-            transportClient = new PreBuiltTransportClient(settings);
+
             String[] hostArray = configuration.getHosts().split(",");
-            for (String host : hostArray) {
-                int i = host.indexOf(":");
-                transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
-                    Integer.parseInt(host.substring(i + 1))));
+            String mode = properties.get("mode");
+            if ("rest".equalsIgnoreCase(mode) || "http".equalsIgnoreCase(mode)) {
+                esConnection = new ESConnection(hostArray, properties, ESConnection.ESClientMode.REST);
+            } else {
+                esConnection = new ESConnection(hostArray, properties, ESConnection.ESClientMode.TRANSPORT);
             }
-            ESTemplate esTemplate = new ESTemplate(transportClient);
+
+            ESTemplate esTemplate = new ESTemplate(esConnection);
             esSyncService = new ESSyncService(esTemplate);
 
             esConfigMonitor = new ESConfigMonitor();
@@ -177,7 +176,7 @@ public class ESAdapter implements OuterAdapter {
         ESSyncConfig config = esSyncConfig.get(task);
         if (config != null) {
             DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
-            ESEtlService esEtlService = new ESEtlService(transportClient, config);
+            ESEtlService esEtlService = new ESEtlService(esConnection, config);
             if (dataSource != null) {
                 return esEtlService.importData(params);
             } else {
@@ -191,7 +190,7 @@ public class ESAdapter implements OuterAdapter {
             for (ESSyncConfig configTmp : esSyncConfig.values()) {
                 // 取所有的destination为task的配置
                 if (configTmp.getDestination().equals(task)) {
-                    ESEtlService esEtlService = new ESEtlService(transportClient, configTmp);
+                    ESEtlService esEtlService = new ESEtlService(esConnection, configTmp);
                     EtlResult etlRes = esEtlService.importData(params);
                     if (!etlRes.getSucceeded()) {
                         resSuccess = false;
@@ -220,10 +219,8 @@ public class ESAdapter implements OuterAdapter {
     public Map<String, Object> count(String task) {
         ESSyncConfig config = esSyncConfig.get(task);
         ESMapping mapping = config.getEsMapping();
-        SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-            .setTypes(mapping.get_type())
-            .setSize(0)
-            .get();
+        SearchResponse response = this.esConnection.new ESSearchRequest(mapping.get_index(), mapping.get_type()).size(0)
+            .getResponse();
 
         long rowCount = response.getHits().getTotalHits();
         Map<String, Object> res = new LinkedHashMap<>();
@@ -237,8 +234,8 @@ public class ESAdapter implements OuterAdapter {
         if (esConfigMonitor != null) {
             esConfigMonitor.destroy();
         }
-        if (transportClient != null) {
-            transportClient.close();
+        if (esConnection != null) {
+            esConnection.close();
         }
     }
 

+ 39 - 38
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -11,12 +11,8 @@ import javax.sql.DataSource;
 
 import org.apache.commons.lang.StringUtils;
 import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.update.UpdateRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
@@ -24,6 +20,11 @@ import org.elasticsearch.search.SearchHit;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
+import com.alibaba.otter.canal.client.adapter.es.support.ESConnection;
+import com.alibaba.otter.canal.client.adapter.es.support.ESConnection.ESBulkRequest;
+import com.alibaba.otter.canal.client.adapter.es.support.ESConnection.ESIndexRequest;
+import com.alibaba.otter.canal.client.adapter.es.support.ESConnection.ESSearchRequest;
+import com.alibaba.otter.canal.client.adapter.es.support.ESConnection.ESUpdateRequest;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.support.AbstractEtlService;
 import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
@@ -38,14 +39,14 @@ import com.alibaba.otter.canal.client.adapter.support.Util;
  */
 public class ESEtlService extends AbstractEtlService {
 
-    private TransportClient transportClient;
-    private ESTemplate      esTemplate;
-    private ESSyncConfig    config;
+    private ESConnection esConnection;
+    private ESTemplate   esTemplate;
+    private ESSyncConfig config;
 
-    public ESEtlService(TransportClient transportClient, ESSyncConfig config){
+    public ESEtlService(ESConnection esConnection, ESSyncConfig config){
         super("ES", config);
-        this.transportClient = transportClient;
-        this.esTemplate = new ESTemplate(transportClient);
+        this.esConnection = esConnection;
+        this.esTemplate = new ESTemplate(esConnection);
         this.config = config;
     }
 
@@ -79,7 +80,7 @@ public class ESEtlService extends AbstractEtlService {
             Util.sqlRS(ds, sql, values, rs -> {
                 int count = 0;
                 try {
-                    BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+                    ESBulkRequest esBulkRequest = this.esConnection.new ESBulkRequest();
 
                     long batchBegin = System.currentTimeMillis();
                     while (rs.next()) {
@@ -132,41 +133,41 @@ public class ESEtlService extends AbstractEtlService {
                         if (idVal != null) {
                             String parentVal = (String) esFieldData.remove("$parent_routing");
                             if (mapping.isUpsert()) {
-                                UpdateRequestBuilder updateRequestBuilder = transportClient
-                                    .prepareUpdate(mapping.get_index(), mapping.get_type(), idVal.toString())
-                                    .setDoc(esFieldData)
-                                    .setDocAsUpsert(true);
+                                ESUpdateRequest esUpdateRequest = this.esConnection.new ESUpdateRequest(
+                                    mapping.get_index(),
+                                    mapping.get_type(),
+                                    idVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
+
                                 if (StringUtils.isNotEmpty(parentVal)) {
-                                    updateRequestBuilder.setRouting(parentVal);
+                                    esUpdateRequest.setRouting(parentVal);
                                 }
-                                bulkRequestBuilder.add(updateRequestBuilder);
+
+                                esBulkRequest.add(esUpdateRequest);
                             } else {
-                                IndexRequestBuilder indexRequestBuilder = transportClient
-                                    .prepareIndex(mapping.get_index(), mapping.get_type(), idVal.toString())
-                                    .setSource(esFieldData);
+                                ESIndexRequest esIndexRequest = this.esConnection.new ESIndexRequest(mapping
+                                    .get_index(), mapping.get_type(), idVal.toString()).setSource(esFieldData);
                                 if (StringUtils.isNotEmpty(parentVal)) {
-                                    indexRequestBuilder.setRouting(parentVal);
+                                    esIndexRequest.setRouting(parentVal);
                                 }
-                                bulkRequestBuilder.add(indexRequestBuilder);
+                                esBulkRequest.add(esIndexRequest);
                             }
                         } else {
                             idVal = esFieldData.get(mapping.getPk());
-                            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-                                .setTypes(mapping.get_type())
-                                .setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))
-                                .setSize(10000)
-                                .get();
+                            ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index(),
+                                mapping.get_type()).setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))
+                                    .size(10000);
+                            SearchResponse response = esSearchRequest.getResponse();
                             for (SearchHit hit : response.getHits()) {
-                                bulkRequestBuilder.add(
-                                    transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
-                                        .setDoc(esFieldData));
+                                ESUpdateRequest esUpdateRequest = this.esConnection.new ESUpdateRequest(mapping
+                                    .get_index(), mapping.get_type(), hit.getId()).setDoc(esFieldData);
+                                esBulkRequest.add(esUpdateRequest);
                             }
                         }
 
-                        if (bulkRequestBuilder.numberOfActions() % mapping.getCommitBatch() == 0
-                            && bulkRequestBuilder.numberOfActions() > 0) {
+                        if (esBulkRequest.numberOfActions() % mapping.getCommitBatch() == 0
+                            && esBulkRequest.numberOfActions() > 0) {
                             long esBatchBegin = System.currentTimeMillis();
-                            BulkResponse rp = bulkRequestBuilder.execute().actionGet();
+                            BulkResponse rp = esBulkRequest.bulk();
                             if (rp.hasFailures()) {
                                 this.processFailBulkResponse(rp);
                             }
@@ -175,19 +176,19 @@ public class ESEtlService extends AbstractEtlService {
                                 logger.trace("全量数据批量导入批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
                                     (System.currentTimeMillis() - batchBegin),
                                     (System.currentTimeMillis() - esBatchBegin),
-                                    bulkRequestBuilder.numberOfActions(),
+                                    esBulkRequest.numberOfActions(),
                                     mapping.get_index());
                             }
                             batchBegin = System.currentTimeMillis();
-                            bulkRequestBuilder = transportClient.prepareBulk();
+                            esBulkRequest.resetBulk();
                         }
                         count++;
                         impCount.incrementAndGet();
                     }
 
-                    if (bulkRequestBuilder.numberOfActions() > 0) {
+                    if (esBulkRequest.numberOfActions() > 0) {
                         long esBatchBegin = System.currentTimeMillis();
-                        BulkResponse rp = bulkRequestBuilder.execute().actionGet();
+                        BulkResponse rp = esBulkRequest.bulk();
                         if (rp.hasFailures()) {
                             this.processFailBulkResponse(rp);
                         }
@@ -195,7 +196,7 @@ public class ESEtlService extends AbstractEtlService {
                             logger.trace("全量数据批量导入最后批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
                                 (System.currentTimeMillis() - batchBegin),
                                 (System.currentTimeMillis() - esBatchBegin),
-                                bulkRequestBuilder.numberOfActions(),
+                                esBulkRequest.numberOfActions(),
                                 mapping.get_index());
                         }
                     }

+ 468 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESConnection.java

@@ -0,0 +1,468 @@
+package com.alibaba.otter.canal.client.adapter.es.support;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.client.*;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ES 连接器, Transport Rest 两种方式
+ *
+ * @author rewerma 2019-08-01
+ * @version 1.0.0
+ */
+public class ESConnection {
+
+    private static final Logger logger = LoggerFactory.getLogger(ESConnection.class);
+
+    public enum ESClientMode {
+                              TRANSPORT, REST
+    }
+
+    private ESClientMode        mode;
+
+    private TransportClient     transportClient;
+
+    private RestHighLevelClient restHighLevelClient;
+
+    public ESConnection(String[] hosts, Map<String, String> properties, ESClientMode mode) throws UnknownHostException{
+        this.mode = mode;
+        if (mode == ESClientMode.TRANSPORT) {
+            Settings.Builder settingBuilder = Settings.builder();
+            properties.forEach(settingBuilder::put);
+            Settings settings = settingBuilder.build();
+            transportClient = new PreBuiltTransportClient(settings);
+            for (String host : hosts) {
+                int i = host.indexOf(":");
+                transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
+                    Integer.parseInt(host.substring(i + 1))));
+            }
+        } else {
+            HttpHost[] httpHosts = new HttpHost[hosts.length];
+            for (int i = 0; i < hosts.length; i++) {
+                String host = hosts[i];
+                int j = host.indexOf(":");
+                HttpHost httpHost = new HttpHost(InetAddress.getByName(host.substring(0, j)),
+                    Integer.parseInt(host.substring(j + 1)));
+                httpHosts[i] = httpHost;
+            }
+            RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
+            String nameAndPwd = properties.get("security.auth");
+            if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
+                String[] nameAndPwdArr = nameAndPwd.split(":");
+                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+                credentialsProvider.setCredentials(AuthScope.ANY,
+                    new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
+                restClientBuilder.setHttpClientConfigCallback(
+                    httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+            }
+            restHighLevelClient = new RestHighLevelClient(restClientBuilder);
+        }
+    }
+
+    public void close() {
+        if (mode == ESClientMode.TRANSPORT) {
+            transportClient.close();
+        } else {
+            try {
+                restHighLevelClient.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public MappingMetaData getMapping(String index, String type) {
+        MappingMetaData mappingMetaData = null;
+        if (mode == ESClientMode.TRANSPORT) {
+            ImmutableOpenMap<String, MappingMetaData> mappings;
+            try {
+                mappings = transportClient.admin()
+                    .cluster()
+                    .prepareState()
+                    .execute()
+                    .actionGet()
+                    .getState()
+                    .getMetaData()
+                    .getIndices()
+                    .get(index)
+                    .getMappings();
+            } catch (NullPointerException e) {
+                throw new IllegalArgumentException("Not found the mapping info of index: " + index);
+            }
+            mappingMetaData = mappings.get(type);
+
+        } else {
+            ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings;
+            try {
+                GetMappingsRequest request = new GetMappingsRequest();
+                request.indices(index);
+                GetMappingsResponse response;
+                // try {
+                // response = restHighLevelClient
+                // .indices()
+                // .getMapping(request, RequestOptions.DEFAULT);
+                // // 6.4以下版本直接使用该接口会报错
+                // } catch (Exception e) {
+                // logger.warn("Low ElasticSearch version for getMapping");
+                response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
+                // }
+
+                mappings = response.mappings();
+            } catch (NullPointerException e) {
+                throw new IllegalArgumentException("Not found the mapping info of index: " + index);
+            } catch (IOException e) {
+                logger.error(e.getMessage(), e);
+                return null;
+            }
+            mappingMetaData = mappings.get(index).get(type);
+        }
+        return mappingMetaData;
+    }
+
+    public class ESIndexRequest {
+
+        private IndexRequestBuilder indexRequestBuilder;
+
+        private IndexRequest        indexRequest;
+
+        public ESIndexRequest(String index, String type, String id){
+            if (mode == ESClientMode.TRANSPORT) {
+                indexRequestBuilder = transportClient.prepareIndex(index, type, id);
+            } else {
+                indexRequest = new IndexRequest(index, type, id);
+            }
+        }
+
+        public ESIndexRequest setSource(Map<String, ?> source) {
+            if (mode == ESClientMode.TRANSPORT) {
+                indexRequestBuilder.setSource(source);
+            } else {
+                indexRequest.source(source);
+            }
+            return this;
+        }
+
+        public ESIndexRequest setRouting(String routing) {
+            if (mode == ESClientMode.TRANSPORT) {
+                indexRequestBuilder.setRouting(routing);
+            } else {
+                indexRequest.routing(routing);
+            }
+            return this;
+        }
+
+        public IndexRequestBuilder getIndexRequestBuilder() {
+            return indexRequestBuilder;
+        }
+
+        public void setIndexRequestBuilder(IndexRequestBuilder indexRequestBuilder) {
+            this.indexRequestBuilder = indexRequestBuilder;
+        }
+
+        public IndexRequest getIndexRequest() {
+            return indexRequest;
+        }
+
+        public void setIndexRequest(IndexRequest indexRequest) {
+            this.indexRequest = indexRequest;
+        }
+    }
+
+    public class ESUpdateRequest {
+
+        private UpdateRequestBuilder updateRequestBuilder;
+
+        private UpdateRequest        updateRequest;
+
+        public ESUpdateRequest(String index, String type, String id){
+            if (mode == ESClientMode.TRANSPORT) {
+                updateRequestBuilder = transportClient.prepareUpdate(index, type, id);
+            } else {
+                updateRequest = new UpdateRequest(index, type, id);
+            }
+        }
+
+        public ESUpdateRequest setDoc(Map source) {
+            if (mode == ESClientMode.TRANSPORT) {
+                updateRequestBuilder.setDoc(source);
+            } else {
+                updateRequest.doc(source);
+            }
+            return this;
+        }
+
+        public ESUpdateRequest setDocAsUpsert(boolean shouldUpsertDoc) {
+            if (mode == ESClientMode.TRANSPORT) {
+                updateRequestBuilder.setDocAsUpsert(shouldUpsertDoc);
+            } else {
+                updateRequest.docAsUpsert(shouldUpsertDoc);
+            }
+            return this;
+        }
+
+        public ESUpdateRequest setRouting(String routing) {
+            if (mode == ESClientMode.TRANSPORT) {
+                updateRequestBuilder.setRouting(routing);
+            } else {
+                updateRequest.routing(routing);
+            }
+            return this;
+        }
+
+        public UpdateRequestBuilder getUpdateRequestBuilder() {
+            return updateRequestBuilder;
+        }
+
+        public void setUpdateRequestBuilder(UpdateRequestBuilder updateRequestBuilder) {
+            this.updateRequestBuilder = updateRequestBuilder;
+        }
+
+        public UpdateRequest getUpdateRequest() {
+            return updateRequest;
+        }
+
+        public void setUpdateRequest(UpdateRequest updateRequest) {
+            this.updateRequest = updateRequest;
+        }
+    }
+
+    public class ESDeleteRequest {
+
+        private DeleteRequestBuilder deleteRequestBuilder;
+
+        private DeleteRequest        deleteRequest;
+
+        public ESDeleteRequest(String index, String type, String id){
+            if (mode == ESClientMode.TRANSPORT) {
+                deleteRequestBuilder = transportClient.prepareDelete(index, type, id);
+            } else {
+                deleteRequest = new DeleteRequest(index, type, id);
+            }
+        }
+
+        public DeleteRequestBuilder getDeleteRequestBuilder() {
+            return deleteRequestBuilder;
+        }
+
+        public void setDeleteRequestBuilder(DeleteRequestBuilder deleteRequestBuilder) {
+            this.deleteRequestBuilder = deleteRequestBuilder;
+        }
+
+        public DeleteRequest getDeleteRequest() {
+            return deleteRequest;
+        }
+
+        public void setDeleteRequest(DeleteRequest deleteRequest) {
+            this.deleteRequest = deleteRequest;
+        }
+    }
+
+    public class ESSearchRequest {
+
+        private SearchRequestBuilder searchRequestBuilder;
+
+        private SearchRequest        searchRequest;
+
+        private SearchSourceBuilder  sourceBuilder;
+
+        public ESSearchRequest(String index, String... types){
+            if (mode == ESClientMode.TRANSPORT) {
+                searchRequestBuilder = transportClient.prepareSearch(index).setTypes(types);
+            } else {
+                searchRequest = new SearchRequest(index).types(types);
+                sourceBuilder = new SearchSourceBuilder();
+            }
+        }
+
+        public ESSearchRequest setQuery(QueryBuilder queryBuilder) {
+            if (mode == ESClientMode.TRANSPORT) {
+                searchRequestBuilder.setQuery(queryBuilder);
+            } else {
+                sourceBuilder.query(queryBuilder);
+            }
+            return this;
+        }
+
+        public ESSearchRequest size(int size) {
+            if (mode == ESClientMode.TRANSPORT) {
+                searchRequestBuilder.setSize(size);
+            } else {
+                sourceBuilder.size(size);
+            }
+            return this;
+        }
+
+        public SearchResponse getResponse() {
+            if (mode == ESClientMode.TRANSPORT) {
+                return searchRequestBuilder.get();
+            } else {
+                searchRequest.source(sourceBuilder);
+                try {
+                    return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        public SearchRequestBuilder getSearchRequestBuilder() {
+            return searchRequestBuilder;
+        }
+
+        public void setSearchRequestBuilder(SearchRequestBuilder searchRequestBuilder) {
+            this.searchRequestBuilder = searchRequestBuilder;
+        }
+
+        public SearchRequest getSearchRequest() {
+            return searchRequest;
+        }
+
+        public void setSearchRequest(SearchRequest searchRequest) {
+            this.searchRequest = searchRequest;
+        }
+    }
+
+    public class ESBulkRequest {
+
+        private BulkRequestBuilder bulkRequestBuilder;
+
+        private BulkRequest        bulkRequest;
+
+        public ESBulkRequest(){
+            if (mode == ESClientMode.TRANSPORT) {
+                bulkRequestBuilder = transportClient.prepareBulk();
+            } else {
+                bulkRequest = new BulkRequest();
+            }
+        }
+
+        public void resetBulk() {
+            if (mode == ESClientMode.TRANSPORT) {
+                bulkRequestBuilder = transportClient.prepareBulk();
+            } else {
+                bulkRequest = new BulkRequest();
+            }
+        }
+
+        public ESBulkRequest add(ESIndexRequest esIndexRequest) {
+            if (mode == ESClientMode.TRANSPORT) {
+                bulkRequestBuilder.add(esIndexRequest.indexRequestBuilder);
+            } else {
+                bulkRequest.add(esIndexRequest.indexRequest);
+            }
+            return this;
+        }
+
+        public ESBulkRequest add(ESUpdateRequest esUpdateRequest) {
+            if (mode == ESClientMode.TRANSPORT) {
+                bulkRequestBuilder.add(esUpdateRequest.updateRequestBuilder);
+            } else {
+                bulkRequest.add(esUpdateRequest.updateRequest);
+            }
+            return this;
+        }
+
+        public ESBulkRequest add(ESDeleteRequest esDeleteRequest) {
+            if (mode == ESClientMode.TRANSPORT) {
+                bulkRequestBuilder.add(esDeleteRequest.deleteRequestBuilder);
+            } else {
+                bulkRequest.add(esDeleteRequest.deleteRequest);
+            }
+            return this;
+        }
+
+        public int numberOfActions() {
+            if (mode == ESClientMode.TRANSPORT) {
+                return bulkRequestBuilder.numberOfActions();
+            } else {
+                return bulkRequest.numberOfActions();
+            }
+        }
+
+        public BulkResponse bulk() {
+            if (mode == ESClientMode.TRANSPORT) {
+                return bulkRequestBuilder.execute().actionGet();
+            } else {
+                try {
+                    return restHighLevelClient.bulk(bulkRequest);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        public BulkRequestBuilder getBulkRequestBuilder() {
+            return bulkRequestBuilder;
+        }
+
+        public void setBulkRequestBuilder(BulkRequestBuilder bulkRequestBuilder) {
+            this.bulkRequestBuilder = bulkRequestBuilder;
+        }
+
+        public BulkRequest getBulkRequest() {
+            return bulkRequest;
+        }
+
+        public void setBulkRequest(BulkRequest bulkRequest) {
+            this.bulkRequest = bulkRequest;
+        }
+    }
+
+    // ------ get/set ------
+    public ESClientMode getMode() {
+        return mode;
+    }
+
+    public void setMode(ESClientMode mode) {
+        this.mode = mode;
+    }
+
+    public TransportClient getTransportClient() {
+        return transportClient;
+    }
+
+    public void setTransportClient(TransportClient transportClient) {
+        this.transportClient = transportClient;
+    }
+
+    public RestHighLevelClient getRestHighLevelClient() {
+        return restHighLevelClient;
+    }
+
+    public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
+        this.restHighLevelClient = restHighLevelClient;
+    }
+}

+ 88 - 101
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -10,14 +10,9 @@ import javax.sql.DataSource;
 
 import org.apache.commons.lang.StringUtils;
 import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.update.UpdateRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
@@ -30,6 +25,7 @@ import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
+import com.alibaba.otter.canal.client.adapter.es.support.ESConnection.*;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Util;
 
@@ -41,65 +37,65 @@ import com.alibaba.otter.canal.client.adapter.support.Util;
  */
 public class ESTemplate {
 
-    private static final Logger logger         = LoggerFactory.getLogger(ESTemplate.class);
+    private static final Logger logger = LoggerFactory.getLogger(ESTemplate.class);
 
-    private static final int    MAX_BATCH_SIZE = 1000;
+    private static final int MAX_BATCH_SIZE = 1000;
 
-    private TransportClient     transportClient;
+    private ESConnection esConnection;
 
-    private BulkRequestBuilder  bulkRequestBuilder;
+    private ESBulkRequest esBulkRequest;
 
-    public ESTemplate(TransportClient transportClient){
-        this.transportClient = transportClient;
-        this.bulkRequestBuilder = transportClient.prepareBulk();
+    public ESTemplate(ESConnection esConnection) {
+        this.esConnection = esConnection;
+        this.esBulkRequest = this.esConnection.new ESBulkRequest();
     }
 
-    public BulkRequestBuilder getBulk() {
-        return bulkRequestBuilder;
+    public ESBulkRequest getBulk() {
+        return esBulkRequest;
     }
 
     public void resetBulkRequestBuilder() {
-        this.bulkRequestBuilder = this.transportClient.prepareBulk();
+        this.esBulkRequest.resetBulk();
     }
 
     /**
      * 插入数据
      *
-     * @param mapping 配置对象
-     * @param pkVal 主键值
+     * @param mapping     配置对象
+     * @param pkVal       主键值
      * @param esFieldData 数据Map
      */
     public void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
             String parentVal = (String) esFieldData.remove("$parent_routing");
             if (mapping.isUpsert()) {
-                UpdateRequestBuilder updateRequestBuilder = transportClient
-                    .prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                    .setDoc(esFieldData)
-                    .setDocAsUpsert(true);
+                ESUpdateRequest updateRequest = esConnection.new ESUpdateRequest(mapping.get_index(),
+                        mapping.get_type(),
+                        pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
                 if (StringUtils.isNotEmpty(parentVal)) {
-                    updateRequestBuilder.setRouting(parentVal);
+                    updateRequest.setRouting(parentVal);
                 }
-                getBulk().add(updateRequestBuilder);
+                getBulk().add(updateRequest);
             } else {
-                IndexRequestBuilder indexRequestBuilder = transportClient
-                    .prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                    .setSource(esFieldData);
+                ESIndexRequest indexRequest = esConnection.new ESIndexRequest(mapping.get_index(),
+                        mapping.get_type(),
+                        pkVal.toString()).setSource(esFieldData);
                 if (StringUtils.isNotEmpty(parentVal)) {
-                    indexRequestBuilder.setRouting(parentVal);
+                    indexRequest.setRouting(parentVal);
                 }
-                getBulk().add(indexRequestBuilder);
+                getBulk().add(indexRequest);
             }
             commitBulk();
         } else {
-            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-                .setTypes(mapping.get_type())
-                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-                .setSize(10000)
-                .get();
+            ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index(),
+                    mapping.get_type()).setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal)).size(10000);
+            SearchResponse response = esSearchRequest.getResponse();
+
             for (SearchHit hit : response.getHits()) {
-                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
-                    .setDoc(esFieldData));
+                ESUpdateRequest esUpdateRequest = this.esConnection.new ESUpdateRequest(mapping.get_index(),
+                        mapping.get_type(),
+                        hit.getId()).setDoc(esFieldData);
+                getBulk().add(esUpdateRequest);
                 commitBulk();
             }
         }
@@ -109,8 +105,8 @@ public class ESTemplate {
     /**
      * 根据主键更新数据
      *
-     * @param mapping 配置对象
-     * @param pkVal 主键值
+     * @param mapping     配置对象
+     * @param pkVal       主键值
      * @param esFieldData 数据Map
      */
     public void update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
@@ -123,8 +119,8 @@ public class ESTemplate {
     /**
      * update by query
      *
-     * @param config 配置对象
-     * @param paramsTmp sql查询条件
+     * @param config      配置对象
+     * @param paramsTmp   sql查询条件
      * @param esFieldData 数据Map
      */
     public void updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {
@@ -143,7 +139,7 @@ public class ESTemplate {
             sql.append("_v.").append(fieldName).append("=? AND ");
             values.add(value);
         });
-        //TODO 直接外部包裹sql会导致全表扫描性能低, 待优化拼接内部where条件
+        // TODO 直接外部包裹sql会导致全表扫描性能低, 待优化拼接内部where条件
         int len = sql.length();
         sql.delete(len - 4, len);
         Integer syncCount = (Integer) Util.sqlRS(ds, sql.toString(), values, rs -> {
@@ -168,23 +164,26 @@ public class ESTemplate {
     /**
      * 通过主键删除数据
      *
-     * @param mapping 配置对象
-     * @param pkVal 主键值
+     * @param mapping     配置对象
+     * @param pkVal       主键值
      * @param esFieldData 数据Map
      */
     public void delete(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
-            getBulk().add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), pkVal.toString()));
+            ESDeleteRequest esDeleteRequest = this.esConnection.new ESDeleteRequest(mapping.get_index(),
+                    mapping.get_type(),
+                    pkVal.toString());
+            getBulk().add(esDeleteRequest);
             commitBulk();
         } else {
-            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-                .setTypes(mapping.get_type())
-                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-                .setSize(10000)
-                .get();
+            ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index(),
+                    mapping.get_type()).setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal)).size(10000);
+            SearchResponse response = esSearchRequest.getResponse();
             for (SearchHit hit : response.getHits()) {
-                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
-                    .setDoc(esFieldData));
+                ESUpdateRequest esUpdateRequest = this.esConnection.new ESUpdateRequest(mapping.get_index(),
+                        mapping.get_type(),
+                        hit.getId()).setDoc(esFieldData);
+                getBulk().add(esUpdateRequest);
                 commitBulk();
             }
         }
@@ -196,7 +195,7 @@ public class ESTemplate {
      */
     public void commit() {
         if (getBulk().numberOfActions() > 0) {
-            BulkResponse response = getBulk().execute().actionGet();
+            BulkResponse response = getBulk().bulk();
             if (response.hasFailures()) {
                 for (BulkItemResponse itemResponse : response.getItems()) {
                     if (!itemResponse.isFailed()) {
@@ -227,32 +226,31 @@ public class ESTemplate {
         if (mapping.get_id() != null) {
             String parentVal = (String) esFieldData.remove("$parent_routing");
             if (mapping.isUpsert()) {
-                UpdateRequestBuilder updateRequestBuilder = transportClient
-                    .prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                    .setDoc(esFieldData)
-                    .setDocAsUpsert(true);
+                ESUpdateRequest esUpdateRequest = this.esConnection.new ESUpdateRequest(mapping.get_index(),
+                        mapping.get_type(),
+                        pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
                 if (StringUtils.isNotEmpty(parentVal)) {
-                    updateRequestBuilder.setRouting(parentVal);
+                    esUpdateRequest.setRouting(parentVal);
                 }
-                getBulk().add(updateRequestBuilder);
+                getBulk().add(esUpdateRequest);
             } else {
-                UpdateRequestBuilder updateRequestBuilder = transportClient
-                    .prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                    .setDoc(esFieldData);
+                ESUpdateRequest esUpdateRequest = this.esConnection.new ESUpdateRequest(mapping.get_index(),
+                        mapping.get_type(),
+                        pkVal.toString()).setDoc(esFieldData);
                 if (StringUtils.isNotEmpty(parentVal)) {
-                    updateRequestBuilder.setRouting(parentVal);
+                    esUpdateRequest.setRouting(parentVal);
                 }
-                getBulk().add(updateRequestBuilder);
+                getBulk().add(esUpdateRequest);
             }
         } else {
-            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-                .setTypes(mapping.get_type())
-                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-                .setSize(10000)
-                .get();
+            ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index(),
+                    mapping.get_type()).setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal)).size(10000);
+            SearchResponse response = esSearchRequest.getResponse();
             for (SearchHit hit : response.getHits()) {
-                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
-                    .setDoc(esFieldData));
+                ESUpdateRequest esUpdateRequest = this.esConnection.new ESUpdateRequest(mapping.get_index(),
+                        mapping.get_type(),
+                        hit.getId()).setDoc(esFieldData);
+                getBulk().add(esUpdateRequest);
             }
         }
     }
@@ -291,7 +289,7 @@ public class ESTemplate {
             }
 
             if (!fieldItem.getFieldName().equals(mapping.get_id())
-                && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                    && !mapping.getSkips().contains(fieldItem.getFieldName())) {
                 esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
             }
         }
@@ -329,9 +327,9 @@ public class ESTemplate {
 
             for (ColumnItem columnItem : fieldItem.getColumnItems()) {
                 if (dmlOld.containsKey(columnItem.getColumnName())
-                    && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                        && !mapping.getSkips().contains(fieldItem.getFieldName())) {
                     esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
-                        getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
+                            getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
                     break;
                 }
             }
@@ -363,8 +361,8 @@ public class ESTemplate {
     /**
      * 将dml的data转换为es的data
      *
-     * @param mapping 配置mapping
-     * @param dmlData dml data
+     * @param mapping     配置mapping
+     * @param dmlData     dml data
      * @param esFieldData es data
      * @return 返回 id 值
      */
@@ -382,7 +380,7 @@ public class ESTemplate {
             }
 
             if (!fieldItem.getFieldName().equals(mapping.get_id())
-                && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                    && !mapping.getSkips().contains(fieldItem.getFieldName())) {
                 esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
             }
         }
@@ -395,8 +393,8 @@ public class ESTemplate {
     /**
      * 将dml的data, old转换为es的data
      *
-     * @param mapping 配置mapping
-     * @param dmlData dml data
+     * @param mapping     配置mapping
+     * @param dmlData     dml data
      * @param esFieldData es data
      * @return 返回 id 值
      */
@@ -414,7 +412,7 @@ public class ESTemplate {
 
             if (dmlOld.containsKey(columnName) && !mapping.getSkips().contains(fieldItem.getFieldName())) {
                 esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
-                    getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
+                        getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
             }
         }
 
@@ -435,9 +433,9 @@ public class ESTemplate {
                     Object parentVal;
                     try {
                         parentVal = getValFromRS(mapping,
-                            resultSet,
-                            parentFieldItem.getFieldName(),
-                            parentFieldItem.getFieldName());
+                                resultSet,
+                                parentFieldItem.getFieldName(),
+                                parentFieldItem.getFieldName());
                     } catch (SQLException e) {
                         throw new RuntimeException(e);
                     }
@@ -482,7 +480,7 @@ public class ESTemplate {
     /**
      * 获取es mapping中的属性类型
      *
-     * @param mapping mapping配置
+     * @param mapping   mapping配置
      * @param fieldName 属性名
      * @return 类型
      */
@@ -490,23 +488,11 @@ public class ESTemplate {
     private String getEsType(ESMapping mapping, String fieldName) {
         String key = mapping.get_index() + "-" + mapping.get_type();
         Map<String, String> fieldType = esFieldTypes.get(key);
-        if (fieldType == null) {
-            ImmutableOpenMap<String, MappingMetaData> mappings;
-            try {
-                mappings = transportClient.admin()
-                    .cluster()
-                    .prepareState()
-                    .execute()
-                    .actionGet()
-                    .getState()
-                    .getMetaData()
-                    .getIndices()
-                    .get(mapping.get_index())
-                    .getMappings();
-            } catch (NullPointerException e) {
-                throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
-            }
-            MappingMetaData mappingMetaData = mappings.get(mapping.get_type());
+        if (fieldType != null) {
+            return fieldType.get(fieldName);
+        } else {
+            MappingMetaData mappingMetaData = esConnection.getMapping(mapping.get_index(), mapping.get_type());
+
             if (mappingMetaData == null) {
                 throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
             }
@@ -524,8 +510,9 @@ public class ESTemplate {
                 }
             }
             esFieldTypes.put(key, fieldType);
-        }
 
-        return fieldType.get(fieldName);
+            return fieldType.get(fieldName);
+        }
     }
+
 }

+ 30 - 0
client-adapter/elasticsearch/src/main/java/org/elasticsearch/client/RequestConvertersExt.java

@@ -0,0 +1,30 @@
+package org.elasticsearch.client;
+
+import org.apache.http.client.methods.HttpGet;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.common.Strings;
+
+import java.io.IOException;
+
+/**
+ * RequestConverters扩展
+ *
+ * @author rewerma 2019-08-01
+ * @version 1.0.0
+ */
+public class RequestConvertersExt {
+
+    /**
+     * 修改 getMappings 去掉request参数
+     *
+     * @param getMappingsRequest
+     * @return
+     * @throws IOException
+     */
+    static Request getMappings(GetMappingsRequest getMappingsRequest) throws IOException {
+        String[] indices = getMappingsRequest.indices() == null ? Strings.EMPTY_ARRAY : getMappingsRequest.indices();
+        String[] types = getMappingsRequest.types() == null ? Strings.EMPTY_ARRAY : getMappingsRequest.types();
+
+        return new Request(HttpGet.METHOD_NAME, RequestConverters.endpoint(indices, "_mapping", types));
+    }
+}

+ 29 - 0
client-adapter/elasticsearch/src/main/java/org/elasticsearch/client/RestHighLevelClientExt.java

@@ -0,0 +1,29 @@
+package org.elasticsearch.client;
+
+import static java.util.Collections.emptySet;
+
+import java.io.IOException;
+
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+
+/**
+ * RestHighLevelClient扩展
+ *
+ * @author rewerma 2019-08-01
+ * @version 1.0.0
+ */
+public class RestHighLevelClientExt {
+
+    public static GetMappingsResponse getMapping(RestHighLevelClient restHighLevelClient,
+                                                 GetMappingsRequest getMappingsRequest,
+                                                 RequestOptions options) throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(getMappingsRequest,
+            RequestConvertersExt::getMappings,
+            options,
+            GetMappingsResponse::fromXContent,
+            emptySet());
+
+    }
+
+}

+ 13 - 3
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSub2Test.java

@@ -18,6 +18,7 @@ import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
+
 @Ignore
 public class LabelSyncJoinSub2Test {
 
@@ -59,7 +60,10 @@ public class LabelSyncJoinSub2Test {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertEquals("b;a_", response.getSource().get("_labels"));
     }
 
@@ -97,7 +101,10 @@ public class LabelSyncJoinSub2Test {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertEquals("b;aa_", response.getSource().get("_labels"));
     }
 
@@ -129,7 +136,10 @@ public class LabelSyncJoinSub2Test {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertEquals("b_", response.getSource().get("_labels"));
     }
 }

+ 13 - 3
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSubTest.java

@@ -18,6 +18,7 @@ import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
+
 @Ignore
 public class LabelSyncJoinSubTest {
 
@@ -59,7 +60,10 @@ public class LabelSyncJoinSubTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertEquals("b;a", response.getSource().get("_labels"));
     }
 
@@ -97,7 +101,10 @@ public class LabelSyncJoinSubTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertEquals("b;aa", response.getSource().get("_labels"));
     }
 
@@ -129,7 +136,10 @@ public class LabelSyncJoinSubTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertEquals("b", response.getSource().get("_labels"));
     }
 }

+ 9 - 2
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOne2Test.java

@@ -18,6 +18,7 @@ import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
+
 @Ignore
 public class RoleSyncJoinOne2Test {
 
@@ -57,7 +58,10 @@ public class RoleSyncJoinOne2Test {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertEquals("admin_", response.getSource().get("_role_name"));
     }
 
@@ -94,7 +98,10 @@ public class RoleSyncJoinOne2Test {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertEquals("admin3_", response.getSource().get("_role_name"));
     }
 }

+ 6 - 5
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java

@@ -58,7 +58,7 @@ public class RoleSyncJoinOneTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection().getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin", response.getSource().get("_role_name"));
     }
 
@@ -95,7 +95,7 @@ public class RoleSyncJoinOneTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection().getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin2", response.getSource().get("_role_name"));
     }
 
@@ -133,7 +133,7 @@ public class RoleSyncJoinOneTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection().getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("operator", response.getSource().get("_role_name"));
 
         Common.sqlExe(ds, "update user set role_id=1 where id=1");
@@ -158,7 +158,7 @@ public class RoleSyncJoinOneTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml2);
 
-        GetResponse response2 = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response2 = esAdapter.getEsConnection().getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin2", response2.getSource().get("_role_name"));
     }
 
@@ -190,7 +190,8 @@ public class RoleSyncJoinOneTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertNull(response.getSource().get("_role_name"));
     }
 }

+ 9 - 2
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java

@@ -18,6 +18,7 @@ import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
+
 @Ignore
 public class UserSyncJoinOneTest {
 
@@ -59,7 +60,10 @@ public class UserSyncJoinOneTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertEquals("Eric_", response.getSource().get("_name"));
     }
 
@@ -95,7 +99,10 @@ public class UserSyncJoinOneTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertEquals("Eric2_", response.getSource().get("_name"));
     }
 }

+ 13 - 3
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java

@@ -15,6 +15,7 @@ import org.junit.Test;
 import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
+
 @Ignore
 public class UserSyncSingleTest {
 
@@ -52,7 +53,10 @@ public class UserSyncSingleTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertEquals("Eric", response.getSource().get("_name"));
     }
 
@@ -85,7 +89,10 @@ public class UserSyncSingleTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertEquals("Eric2", response.getSource().get("_name"));
     }
 
@@ -115,7 +122,10 @@ public class UserSyncSingleTest {
 
         esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
-        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        GetResponse response = esAdapter.getEsConnection()
+            .getTransportClient()
+            .prepareGet("mytest_user", "_doc", "1")
+            .get();
         Assert.assertNull(response.getSource());
     }
 

+ 251 - 251
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/DbRemoteConfigLoader.java

@@ -1,251 +1,251 @@
-package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
-
-import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.druid.pool.DruidDataSource;
-import com.alibaba.otter.canal.common.utils.CommonUtils;
-import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
-import com.google.common.base.Joiner;
-import com.google.common.collect.MapMaker;
-
-/**
- * 基于数据库的远程配置装载器
- *
- * @author rewerma 2019-01-25 下午05:20:16
- * @version 1.0.0
- */
-public class DbRemoteConfigLoader implements RemoteConfigLoader {
-
-    private static final Logger      logger                 = LoggerFactory.getLogger(DbRemoteConfigLoader.class);
-
-    private DruidDataSource          dataSource;
-
-    private volatile long            currentConfigTimestamp = 0;
-    private Map<String, ConfigItem>  remoteAdapterConfigs   = new MapMaker().makeMap();
-
-    private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
-        new NamedThreadFactory("remote-adapter-config-scan"));
-
-    private RemoteAdapterMonitor     remoteAdapterMonitor   = new RemoteAdapterMonitorImpl();
-
-    public DbRemoteConfigLoader(String driverName, String jdbcUrl, String jdbcUsername, String jdbcPassword){
-        dataSource = new DruidDataSource();
-        if (StringUtils.isEmpty(driverName)) {
-            driverName = "com.mysql.jdbc.Driver";
-        }
-        dataSource.setDriverClassName(driverName);
-        dataSource.setUrl(jdbcUrl);
-        dataSource.setUsername(jdbcUsername);
-        dataSource.setPassword(jdbcPassword);
-        dataSource.setInitialSize(1);
-        dataSource.setMinIdle(1);
-        dataSource.setMaxActive(1);
-        dataSource.setMaxWait(60000);
-        dataSource.setTimeBetweenEvictionRunsMillis(60000);
-        dataSource.setMinEvictableIdleTimeMillis(300000);
-        try {
-            dataSource.init();
-        } catch (SQLException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * 加载远程application.yml配置
-     */
-    @Override
-    public void loadRemoteConfig() {
-        try {
-            // 加载远程adapter配置
-            ConfigItem configItem = getRemoteAdapterConfig();
-            if (configItem != null) {
-                if (configItem.getModifiedTime() != currentConfigTimestamp) {
-                    currentConfigTimestamp = configItem.getModifiedTime();
-                    overrideLocalCanalConfig(configItem.getContent());
-                    logger.info("## Loaded remote adapter config: application.yml");
-                }
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * 获取远程application.yml配置
-     *
-     * @return 配置对象
-     */
-    private ConfigItem getRemoteAdapterConfig() {
-        String sql = "select name, content, modified_time from canal_config where id=2";
-        try (Connection conn = dataSource.getConnection();
-                Statement stmt = conn.createStatement();
-                ResultSet rs = stmt.executeQuery(sql)) {
-            if (rs.next()) {
-                ConfigItem configItem = new ConfigItem();
-                configItem.setId(2L);
-                configItem.setName(rs.getString("name"));
-                configItem.setContent(rs.getString("content"));
-                configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
-                return configItem;
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-        return null;
-    }
-
-    /**
-     * 覆盖本地application.yml文件
-     *
-     * @param content 文件内容
-     */
-    private void overrideLocalCanalConfig(String content) {
-
-        try (OutputStreamWriter writer = new OutputStreamWriter(
-            new FileOutputStream(CommonUtils.getConfPath() + "application.yml"),
-            StandardCharsets.UTF_8)) {
-            writer.write(content);
-            writer.flush();
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * 加载adapter配置
-     */
-    @Override
-    public void loadRemoteAdapterConfigs() {
-        try {
-            // 加载远程adapter配置
-            loadModifiedAdapterConfigs();
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * 加载有变动的adapter配置
-     */
-    private void loadModifiedAdapterConfigs() {
-        Map<String, ConfigItem> remoteConfigStatus = new HashMap<>();
-        String sql = "select id, category, name, modified_time from canal_adapter_config";
-        try (Connection conn = dataSource.getConnection();
-                Statement stmt = conn.createStatement();
-                ResultSet rs = stmt.executeQuery(sql)) {
-            while (rs.next()) {
-                ConfigItem configItem = new ConfigItem();
-                configItem.setId(rs.getLong("id"));
-                configItem.setCategory(rs.getString("category"));
-                configItem.setName(rs.getString("name"));
-                configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
-                remoteConfigStatus.put(configItem.getCategory() + "/" + configItem.getName(), configItem);
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-
-        if (!remoteConfigStatus.isEmpty()) {
-            List<Long> changedIds = new ArrayList<>();
-
-            for (ConfigItem remoteConfigStat : remoteConfigStatus.values()) {
-                ConfigItem currentConfig = remoteAdapterConfigs
-                    .get(remoteConfigStat.getCategory() + "/" + remoteConfigStat.getName());
-                if (currentConfig == null) {
-                    // 新增
-                    changedIds.add(remoteConfigStat.getId());
-                } else {
-                    // 修改
-                    if (currentConfig.getModifiedTime() != remoteConfigStat.getModifiedTime()) {
-                        changedIds.add(remoteConfigStat.getId());
-                    }
-                }
-            }
-            if (!changedIds.isEmpty()) {
-                String contentsSql = "select id, category, name, content, modified_time from canal_adapter_config  where id in ("
-                                     + Joiner.on(",").join(changedIds) + ")";
-                try (Connection conn = dataSource.getConnection();
-                        Statement stmt = conn.createStatement();
-                        ResultSet rs = stmt.executeQuery(contentsSql)) {
-                    while (rs.next()) {
-                        ConfigItem configItemNew = new ConfigItem();
-                        configItemNew.setId(rs.getLong("id"));
-                        configItemNew.setCategory(rs.getString("category"));
-                        configItemNew.setName(rs.getString("name"));
-                        configItemNew.setContent(rs.getString("content"));
-                        configItemNew.setModifiedTime(rs.getTimestamp("modified_time").getTime());
-
-                        remoteAdapterConfigs.put(configItemNew.getCategory() + "/" + configItemNew.getName(),
-                            configItemNew);
-                        remoteAdapterMonitor.onModify(configItemNew);
-                    }
-
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-        }
-
-        for (ConfigItem configItem : remoteAdapterConfigs.values()) {
-            if (!remoteConfigStatus.containsKey(configItem.getCategory() + "/" + configItem.getName())) {
-                // 删除
-                remoteAdapterConfigs.remove(configItem.getCategory() + "/" + configItem.getName());
-                remoteAdapterMonitor.onDelete(configItem.getCategory() + "/" + configItem.getName());
-            }
-        }
-    }
-
-    /**
-     * 启动监听数据库变化
-     */
-    @Override
-    public void startMonitor() {
-        // 监听application.yml变化
-        executor.scheduleWithFixedDelay(() -> {
-            try {
-                loadRemoteConfig();
-            } catch (Throwable e) {
-                logger.error("scan remote application.yml failed", e);
-            }
-        }, 10, 3, TimeUnit.SECONDS);
-
-        // 监听adapter变化
-        executor.scheduleWithFixedDelay(() -> {
-            try {
-                loadRemoteAdapterConfigs();
-            } catch (Throwable e) {
-                logger.error("scan remote adapter configs failed", e);
-            }
-        }, 10, 3, TimeUnit.SECONDS);
-    }
-
-    /**
-     * 销毁
-     */
-    @Override
-    public void destroy() {
-        executor.shutdownNow();
-        try {
-            dataSource.close();
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-}
+package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
+
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.common.utils.CommonUtils;
+import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.google.common.base.Joiner;
+import com.google.common.collect.MapMaker;
+
+/**
+ * 基于数据库的远程配置装载器
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public class DbRemoteConfigLoader implements RemoteConfigLoader {
+
+    private static final Logger      logger                 = LoggerFactory.getLogger(DbRemoteConfigLoader.class);
+
+    private DruidDataSource          dataSource;
+
+    private volatile long            currentConfigTimestamp = 0;
+    private Map<String, ConfigItem>  remoteAdapterConfigs   = new MapMaker().makeMap();
+
+    private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
+        new NamedThreadFactory("remote-adapter-config-scan"));
+
+    private RemoteAdapterMonitor     remoteAdapterMonitor   = new RemoteAdapterMonitorImpl();
+
+    public DbRemoteConfigLoader(String driverName, String jdbcUrl, String jdbcUsername, String jdbcPassword){
+        dataSource = new DruidDataSource();
+        if (StringUtils.isEmpty(driverName)) {
+            driverName = "com.mysql.jdbc.Driver";
+        }
+        dataSource.setDriverClassName(driverName);
+        dataSource.setUrl(jdbcUrl);
+        dataSource.setUsername(jdbcUsername);
+        dataSource.setPassword(jdbcPassword);
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(1);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 加载远程application.yml配置
+     */
+    @Override
+    public void loadRemoteConfig() {
+        try {
+            // 加载远程adapter配置
+            ConfigItem configItem = getRemoteAdapterConfig();
+            if (configItem != null) {
+                if (configItem.getModifiedTime() != currentConfigTimestamp) {
+                    currentConfigTimestamp = configItem.getModifiedTime();
+                    overrideLocalCanalConfig(configItem.getContent());
+                    logger.info("## Loaded remote adapter config: application.yml");
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 获取远程application.yml配置
+     *
+     * @return 配置对象
+     */
+    private ConfigItem getRemoteAdapterConfig() {
+        String sql = "select name, content, modified_time from canal_config where id=2";
+        try (Connection conn = dataSource.getConnection();
+                Statement stmt = conn.createStatement();
+                ResultSet rs = stmt.executeQuery(sql)) {
+            if (rs.next()) {
+                ConfigItem configItem = new ConfigItem();
+                configItem.setId(2L);
+                configItem.setName(rs.getString("name"));
+                configItem.setContent(rs.getString("content"));
+                configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
+                return configItem;
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+        return null;
+    }
+
+    /**
+     * 覆盖本地application.yml文件
+     *
+     * @param content 文件内容
+     */
+    private void overrideLocalCanalConfig(String content) {
+
+        try (OutputStreamWriter writer = new OutputStreamWriter(
+            new FileOutputStream(CommonUtils.getConfPath() + "application.yml"),
+            StandardCharsets.UTF_8)) {
+            writer.write(content);
+            writer.flush();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 加载adapter配置
+     */
+    @Override
+    public void loadRemoteAdapterConfigs() {
+        try {
+            // 加载远程adapter配置
+            loadModifiedAdapterConfigs();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 加载有变动的adapter配置
+     */
+    private void loadModifiedAdapterConfigs() {
+        Map<String, ConfigItem> remoteConfigStatus = new HashMap<>();
+        String sql = "select id, category, name, modified_time from canal_adapter_config";
+        try (Connection conn = dataSource.getConnection();
+                Statement stmt = conn.createStatement();
+                ResultSet rs = stmt.executeQuery(sql)) {
+            while (rs.next()) {
+                ConfigItem configItem = new ConfigItem();
+                configItem.setId(rs.getLong("id"));
+                configItem.setCategory(rs.getString("category"));
+                configItem.setName(rs.getString("name"));
+                configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
+                remoteConfigStatus.put(configItem.getCategory() + "/" + configItem.getName(), configItem);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+
+        if (!remoteConfigStatus.isEmpty()) {
+            List<Long> changedIds = new ArrayList<>();
+
+            for (ConfigItem remoteConfigStat : remoteConfigStatus.values()) {
+                ConfigItem currentConfig = remoteAdapterConfigs
+                    .get(remoteConfigStat.getCategory() + "/" + remoteConfigStat.getName());
+                if (currentConfig == null) {
+                    // 新增
+                    changedIds.add(remoteConfigStat.getId());
+                } else {
+                    // 修改
+                    if (currentConfig.getModifiedTime() != remoteConfigStat.getModifiedTime()) {
+                        changedIds.add(remoteConfigStat.getId());
+                    }
+                }
+            }
+            if (!changedIds.isEmpty()) {
+                String contentsSql = "select id, category, name, content, modified_time from canal_adapter_config  where id in ("
+                                     + Joiner.on(",").join(changedIds) + ")";
+                try (Connection conn = dataSource.getConnection();
+                        Statement stmt = conn.createStatement();
+                        ResultSet rs = stmt.executeQuery(contentsSql)) {
+                    while (rs.next()) {
+                        ConfigItem configItemNew = new ConfigItem();
+                        configItemNew.setId(rs.getLong("id"));
+                        configItemNew.setCategory(rs.getString("category"));
+                        configItemNew.setName(rs.getString("name"));
+                        configItemNew.setContent(rs.getString("content"));
+                        configItemNew.setModifiedTime(rs.getTimestamp("modified_time").getTime());
+
+                        remoteAdapterConfigs.put(configItemNew.getCategory() + "/" + configItemNew.getName(),
+                            configItemNew);
+                        remoteAdapterMonitor.onModify(configItemNew);
+                    }
+
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+
+        for (ConfigItem configItem : remoteAdapterConfigs.values()) {
+            if (!remoteConfigStatus.containsKey(configItem.getCategory() + "/" + configItem.getName())) {
+                // 删除
+                remoteAdapterConfigs.remove(configItem.getCategory() + "/" + configItem.getName());
+                remoteAdapterMonitor.onDelete(configItem.getCategory() + "/" + configItem.getName());
+            }
+        }
+    }
+
+    /**
+     * 启动监听数据库变化
+     */
+    @Override
+    public void startMonitor() {
+        // 监听application.yml变化
+        executor.scheduleWithFixedDelay(() -> {
+            try {
+                loadRemoteConfig();
+            } catch (Throwable e) {
+                logger.error("scan remote application.yml failed", e);
+            }
+        }, 10, 3, TimeUnit.SECONDS);
+
+        // 监听adapter变化
+        executor.scheduleWithFixedDelay(() -> {
+            try {
+                loadRemoteAdapterConfigs();
+            } catch (Throwable e) {
+                logger.error("scan remote adapter configs failed", e);
+            }
+        }, 10, 3, TimeUnit.SECONDS);
+    }
+
+    /**
+     * 销毁
+     */
+    @Override
+    public void destroy() {
+        executor.shutdownNow();
+        try {
+            dataSource.close();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+}

+ 7 - 4
client-adapter/launcher/src/main/resources/application.yml

@@ -7,10 +7,10 @@ spring:
     default-property-inclusion: non_null
 
 canal.conf:
-  mode: kafka # kafka rocketMQ
-#  canalServerHost: 127.0.0.1:11111
+  mode: tcp # kafka rocketMQ
+  canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
-  mqServers: 127.0.0.1:9092 #or rocketmq
+#  mqServers: 127.0.0.1:9092 #or rocketmq
 #  flatMessage: true
   batchSize: 500
   syncBatchSize: 1000
@@ -58,6 +58,9 @@ canal.conf:
 #          hbase.zookeeper.property.clientPort: 2181
 #          zookeeper.znode.parent: /hbase
 #      - name: es
-#        hosts: 127.0.0.1:9300
+#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
 #        properties:
+#          mode: transport # or rest
+#          # security.auth: test:123456 #  only used for rest mode
 #          cluster.name: elasticsearch
+

+ 1 - 0
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -177,6 +177,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
 
                     // 每条记录需要flush
                     produce(topicName, records, true);
+                    records.clear();
                 }
             }
         }