Переглянути джерело

feat(es8): es8 支持证书认证 (#5106)

ziyou 1 рік тому
батько
коміт
bbea802914

+ 73 - 25
client-adapter/es8x/src/main/java/com/alibaba/otter/canal/client/adapter/es8x/support/ESConnection.java

@@ -1,18 +1,14 @@
 package com.alibaba.otter.canal.client.adapter.es8x.support;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.Map;
-
+import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;
 import org.apache.commons.lang.StringUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -36,7 +32,20 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.util.Arrays;
+import java.util.Map;
 
 /**
  * ES 连接器, 只支持 Rest 方式
@@ -50,7 +59,46 @@ public class ESConnection {
 
     private RestHighLevelClient restHighLevelClient;
 
-    public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException{
+    public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException {
+        String caPath = properties.get("security.ca.path");
+        if (StringUtils.isNotEmpty(caPath)) {
+            connectEsWithCa(hosts, properties, caPath);
+        } else {
+            connectEsWithoutCa(hosts, properties);
+        }
+    }
+    private void connectEsWithCa(String[] hosts, Map<String, String> properties, String caPath) {
+        Path caCertificatePath = Paths.get(caPath);
+        try (InputStream is = Files.newInputStream(caCertificatePath)) {
+            CertificateFactory factory = CertificateFactory.getInstance("X.509");
+            Certificate trustedCa = factory.generateCertificate(is);
+            KeyStore trustStore = KeyStore.getInstance("pkcs12");
+            trustStore.load(null, null);
+            trustStore.setCertificateEntry("ca", trustedCa);
+            SSLContextBuilder sslContextBuilder = SSLContexts.custom()
+                    .loadTrustMaterial(trustStore, null);
+            final SSLContext sslContext = sslContextBuilder.build();
+
+            HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
+            RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
+            String nameAndPwd = properties.get("security.auth");
+            if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
+                String[] nameAndPwdArr = nameAndPwd.split(":");
+                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+                credentialsProvider.setCredentials(AuthScope.ANY,
+                        new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
+                restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
+                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                    return httpClientBuilder.setSSLContext(sslContext);
+                });
+            }
+            restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true).build();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void connectEsWithoutCa(String[] hosts, Map<String, String> properties) {
         HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
         RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
         String nameAndPwd = properties.get("security.auth");
@@ -58,12 +106,12 @@ public class ESConnection {
             String[] nameAndPwdArr = nameAndPwd.split(":");
             final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
             credentialsProvider.setCredentials(AuthScope.ANY,
-                new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
+                    new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
             restClientBuilder.setHttpClientConfigCallback(
-                httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+                    httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
         }
         restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true)
-            .build();
+                .build();
     }
 
     public void close() {
@@ -99,9 +147,9 @@ public class ESConnection {
 
         private IndexRequestBuilder indexRequestBuilder;
 
-        private IndexRequest        indexRequest;
+        private IndexRequest indexRequest;
 
-        public ES8xIndexRequest(String index, String id){
+        public ES8xIndexRequest(String index, String id) {
             indexRequest = new IndexRequest(index);
             indexRequest.id(id);
 
@@ -142,9 +190,9 @@ public class ESConnection {
 
         private UpdateRequestBuilder updateRequestBuilder;
 
-        private UpdateRequest        updateRequest;
+        private UpdateRequest updateRequest;
 
-        public ES8xUpdateRequest(String index, String id){
+        public ES8xUpdateRequest(String index, String id) {
 
             updateRequest = new UpdateRequest(index, id);
         }
@@ -191,9 +239,9 @@ public class ESConnection {
 
         private DeleteRequestBuilder deleteRequestBuilder;
 
-        private DeleteRequest        deleteRequest;
+        private DeleteRequest deleteRequest;
 
-        public ES8xDeleteRequest(String index, String id){
+        public ES8xDeleteRequest(String index, String id) {
 
             deleteRequest = new DeleteRequest(index, id);
 
@@ -220,11 +268,11 @@ public class ESConnection {
 
         private SearchRequestBuilder searchRequestBuilder;
 
-        private SearchRequest        searchRequest;
+        private SearchRequest searchRequest;
 
-        private SearchSourceBuilder  sourceBuilder;
+        private SearchSourceBuilder sourceBuilder;
 
-        public ESSearchRequest(String index){
+        public ESSearchRequest(String index) {
 
             searchRequest = new SearchRequest(index);
             sourceBuilder = new SearchSourceBuilder();
@@ -277,9 +325,9 @@ public class ESConnection {
 
         private BulkRequestBuilder bulkRequestBuilder;
 
-        private BulkRequest        bulkRequest;
+        private BulkRequest bulkRequest;
 
-        public ES8xBulkRequest(){
+        public ES8xBulkRequest() {
 
             bulkRequest = new BulkRequest();
 
@@ -350,7 +398,7 @@ public class ESConnection {
 
         private BulkResponse bulkResponse;
 
-        public ES8xBulkResponse(BulkResponse bulkResponse){
+        public ES8xBulkResponse(BulkResponse bulkResponse) {
             this.bulkResponse = bulkResponse;
         }
 
@@ -390,7 +438,7 @@ public class ESConnection {
         }
         try {
             return HttpHost.create(new URI(uri
-                .getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())
+                    .getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())
                     .toString());
         } catch (URISyntaxException ex) {
             throw new IllegalStateException(ex);

+ 2 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -91,6 +91,7 @@ canal.conf:
 #        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
 #        properties:
 #          mode: transport # or rest
+#          # security.ca.path: /etc/es8/ca.crt
 #          # security.auth: test:123456 #  only used for rest mode
 #          cluster.name: elasticsearch
 #      - name: kudu
@@ -113,4 +114,4 @@ canal.conf:
 #          jdbc.password: 123456
 #          batchSize: 3000
 #          scheduleTime: 600   # second unit
-#          threads: 3          # parallel threads
+#          threads: 3          # parallel threads