Ver Fonte

fixed CanalAdmin instanceLog and polardbx columnar test

jianghang.loujh há 1 mês atrás
pai
commit
e21e98aa83

+ 10 - 11
client-adapter/es7x/src/main/java/com/alibaba/otter/canal/client/adapter/es7x/support/ES7xTemplate.java

@@ -39,13 +39,10 @@ public class ES7xTemplate implements ESTemplate {
         .getLogger(ESTemplate.class);
 
     private static final int                                  MAX_BATCH_SIZE = 1000;
-
-    private ESConnection                                      esConnection;
-
-    private ESBulkRequest                                     esBulkRequest;
-
     // es 字段类型本地缓存
     private static ConcurrentMap<String, Map<String, String>> esFieldTypes   = new ConcurrentHashMap<>();
+    private ESConnection                                      esConnection;
+    private ESBulkRequest                                     esBulkRequest;
 
     public ES7xTemplate(ESConnection esConnection){
         this.esConnection = esConnection;
@@ -65,8 +62,9 @@ public class ES7xTemplate implements ESTemplate {
         if (mapping.getId() != null) {
             String parentVal = (String) esFieldData.remove("$parent_routing");
             if (mapping.isUpsert()) {
-                ESUpdateRequest updateRequest = esConnection.new ES7xUpdateRequest(mapping.getIndex(),
-                    pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
+                ESUpdateRequest updateRequest = esConnection.new ES7xUpdateRequest(mapping.getIndex(), pkVal.toString())
+                    .setDoc(esFieldData)
+                    .setDocAsUpsert(true);
                 if (StringUtils.isNotEmpty(parentVal)) {
                     updateRequest.setRouting(parentVal);
                 }
@@ -283,7 +281,8 @@ public class ES7xTemplate implements ESTemplate {
     }
 
     @Override
-    public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData, Map<String, Object> esFieldData) {
+    public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData,
+                                       Map<String, Object> esFieldData) {
         SchemaItem schemaItem = mapping.getSchemaItem();
         String idFieldName = mapping.getId() == null ? mapping.getPk() : mapping.getId();
         Object resultIdVal = null;
@@ -307,8 +306,8 @@ public class ES7xTemplate implements ESTemplate {
     }
 
     @Override
-    public Object getESDataFromDmlData(ESMapping mapping,String owner, Map<String, Object> dmlData, Map<String, Object> dmlOld,
-                                       Map<String, Object> esFieldData) {
+    public Object getESDataFromDmlData(ESMapping mapping, String owner, Map<String, Object> dmlData,
+                                       Map<String, Object> dmlOld, Map<String, Object> esFieldData) {
         SchemaItem schemaItem = mapping.getSchemaItem();
         String idFieldName = mapping.getId() == null ? mapping.getPk() : mapping.getId();
         Object resultIdVal = null;
@@ -329,7 +328,7 @@ public class ES7xTemplate implements ESTemplate {
 
             if (dmlOld.containsKey(columnName) && !mapping.getSkips().contains(fieldItem.getFieldName())) {
                 esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
-                        getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
+                    getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
             }
         }
 

+ 76 - 77
client-adapter/es7x/src/main/java/com/alibaba/otter/canal/client/adapter/es7x/support/ESConnection.java

@@ -55,16 +55,9 @@ import com.alibaba.otter.canal.client.adapter.es.core.support.ESBulkRequest;
 public class ESConnection {
 
     private static final Logger logger = LoggerFactory.getLogger(ESConnection.class);
-
-    public enum ESClientMode {
-        TRANSPORT, REST
-    }
-
     private ESClientMode        mode;
-
     @SuppressWarnings("deprecation")
     private TransportClient     transportClient;
-
     private RestHighLevelClient restHighLevelClient;
 
     public ESConnection(String[] hosts, Map<String, String> properties, ESClientMode mode) throws UnknownHostException{
@@ -86,9 +79,10 @@ public class ESConnection {
             if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
                 String[] nameAndPwdArr = nameAndPwd.split(":");
                 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(nameAndPwdArr[0],
-                    nameAndPwdArr[1]));
-                restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+                credentialsProvider.setCredentials(AuthScope.ANY,
+                    new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
+                restClientBuilder.setHttpClientConfigCallback(
+                    httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
             }
             restHighLevelClient = new RestHighLevelClient(restClientBuilder);
         }
@@ -143,6 +137,78 @@ public class ESConnection {
         return mappingMetaData;
     }
 
+    // ------ get/set ------
+    public ESClientMode getMode() {
+        return mode;
+    }
+
+    public void setMode(ESClientMode mode) {
+        this.mode = mode;
+    }
+
+    public TransportClient getTransportClient() {
+        return transportClient;
+    }
+
+    public void setTransportClient(TransportClient transportClient) {
+        this.transportClient = transportClient;
+    }
+
+    public RestHighLevelClient getRestHighLevelClient() {
+        return restHighLevelClient;
+    }
+
+    public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
+        this.restHighLevelClient = restHighLevelClient;
+    }
+
+    private HttpHost createHttpHost(String uriStr) {
+        URI uri = URI.create(uriStr);
+        if (!org.springframework.util.StringUtils.hasLength(uri.getUserInfo())) {
+            return HttpHost.create(uri.toString());
+        }
+        try {
+            return HttpHost.create(new URI(uri
+                .getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())
+                    .toString());
+        } catch (URISyntaxException ex) {
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    public enum ESClientMode {
+                              TRANSPORT, REST
+    }
+
+    public static class ES7xBulkResponse implements ESBulkRequest.ESBulkResponse {
+
+        private BulkResponse bulkResponse;
+
+        public ES7xBulkResponse(BulkResponse bulkResponse){
+            this.bulkResponse = bulkResponse;
+        }
+
+        @Override
+        public boolean hasFailures() {
+            return bulkResponse.hasFailures();
+        }
+
+        @Override
+        public void processFailBulkResponse(String errorMsg) {
+            for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
+                if (!itemResponse.isFailed()) {
+                    continue;
+                }
+
+                if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
+                    logger.error(itemResponse.getFailureMessage());
+                } else {
+                    throw new RuntimeException(errorMsg + itemResponse.getFailureMessage());
+                }
+            }
+        }
+    }
+
     public class ES7xIndexRequest implements ESBulkRequest.ESIndexRequest {
 
         private IndexRequestBuilder indexRequestBuilder;
@@ -443,71 +509,4 @@ public class ESConnection {
             this.bulkRequest = bulkRequest;
         }
     }
-
-    public static class ES7xBulkResponse implements ESBulkRequest.ESBulkResponse {
-
-        private BulkResponse bulkResponse;
-
-        public ES7xBulkResponse(BulkResponse bulkResponse){
-            this.bulkResponse = bulkResponse;
-        }
-
-        @Override
-        public boolean hasFailures() {
-            return bulkResponse.hasFailures();
-        }
-
-        @Override
-        public void processFailBulkResponse(String errorMsg) {
-            for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
-                if (!itemResponse.isFailed()) {
-                    continue;
-                }
-
-                if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
-                    logger.error(itemResponse.getFailureMessage());
-                } else {
-                    throw new RuntimeException(errorMsg + itemResponse.getFailureMessage());
-                }
-            }
-        }
-    }
-
-    // ------ get/set ------
-    public ESClientMode getMode() {
-        return mode;
-    }
-
-    public void setMode(ESClientMode mode) {
-        this.mode = mode;
-    }
-
-    public TransportClient getTransportClient() {
-        return transportClient;
-    }
-
-    public void setTransportClient(TransportClient transportClient) {
-        this.transportClient = transportClient;
-    }
-
-    public RestHighLevelClient getRestHighLevelClient() {
-        return restHighLevelClient;
-    }
-
-    public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
-        this.restHighLevelClient = restHighLevelClient;
-    }
-
-    private HttpHost createHttpHost(String uriStr) {
-        URI uri = URI.create(uriStr);
-        if (!org.springframework.util.StringUtils.hasLength(uri.getUserInfo())) {
-            return HttpHost.create(uri.toString());
-        }
-        try {
-            return HttpHost.create(new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(),
-                                           uri.getQuery(), uri.getFragment()).toString());
-        } catch (URISyntaxException ex) {
-            throw new IllegalStateException(ex);
-        }
-    }
 }

+ 9 - 7
client-adapter/es8x/src/main/java/com/alibaba/otter/canal/client/adapter/es8x/ES8xAdapter.java

@@ -1,5 +1,14 @@
 package com.alibaba.otter.canal.client.adapter.es8x;
 
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.sql.DataSource;
+
+import org.elasticsearch.action.search.SearchResponse;
+
 import com.alibaba.otter.canal.client.adapter.es.core.ESAdapter;
 import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es8x.etl.ESEtlService;
@@ -9,13 +18,6 @@ import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.EtlResult;
 import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
-import org.elasticsearch.action.search.SearchResponse;
-
-import javax.sql.DataSource;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
 
 /**
  * ES 8.x 外部适配器

+ 2 - 5
client-adapter/es8x/src/main/java/com/alibaba/otter/canal/client/adapter/es8x/support/ES8xTemplate.java

@@ -39,13 +39,10 @@ public class ES8xTemplate implements ESTemplate {
         .getLogger(ESTemplate.class);
 
     private static final int                                  MAX_BATCH_SIZE = 1000;
-
-    private ESConnection                                      esConnection;
-
-    private ESBulkRequest                                     esBulkRequest;
-
     // es 字段类型本地缓存
     private static ConcurrentMap<String, Map<String, String>> esFieldTypes   = new ConcurrentHashMap<>();
+    private ESConnection                                      esConnection;
+    private ESBulkRequest                                     esBulkRequest;
 
     public ES8xTemplate(ESConnection esConnection){
         this.esConnection = esConnection;

+ 51 - 51
client-adapter/es8x/src/main/java/com/alibaba/otter/canal/client/adapter/es8x/support/ESConnection.java

@@ -147,6 +147,57 @@ public class ESConnection {
         return mappingMetaData;
     }
 
+    public RestHighLevelClient getRestHighLevelClient() {
+        return restHighLevelClient;
+    }
+
+    public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
+        this.restHighLevelClient = restHighLevelClient;
+    }
+
+    private HttpHost createHttpHost(String uriStr) {
+        URI uri = URI.create(uriStr);
+        if (!org.springframework.util.StringUtils.hasLength(uri.getUserInfo())) {
+            return HttpHost.create(uri.toString());
+        }
+        try {
+            return HttpHost.create(new URI(uri
+                .getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())
+                    .toString());
+        } catch (URISyntaxException ex) {
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    public static class ES8xBulkResponse implements ESBulkRequest.ESBulkResponse {
+
+        private BulkResponse bulkResponse;
+
+        public ES8xBulkResponse(BulkResponse bulkResponse){
+            this.bulkResponse = bulkResponse;
+        }
+
+        @Override
+        public boolean hasFailures() {
+            return bulkResponse.hasFailures();
+        }
+
+        @Override
+        public void processFailBulkResponse(String errorMsg) {
+            for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
+                if (!itemResponse.isFailed()) {
+                    continue;
+                }
+
+                if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
+                    logger.error(itemResponse.getFailureMessage());
+                } else {
+                    throw new RuntimeException(errorMsg + itemResponse.getFailureMessage());
+                }
+            }
+        }
+    }
+
     public class ES8xIndexRequest implements ESBulkRequest.ESIndexRequest {
 
         private IndexRequestBuilder indexRequestBuilder;
@@ -395,55 +446,4 @@ public class ESConnection {
             this.bulkRequest = bulkRequest;
         }
     }
-
-    public static class ES8xBulkResponse implements ESBulkRequest.ESBulkResponse {
-
-        private BulkResponse bulkResponse;
-
-        public ES8xBulkResponse(BulkResponse bulkResponse){
-            this.bulkResponse = bulkResponse;
-        }
-
-        @Override
-        public boolean hasFailures() {
-            return bulkResponse.hasFailures();
-        }
-
-        @Override
-        public void processFailBulkResponse(String errorMsg) {
-            for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
-                if (!itemResponse.isFailed()) {
-                    continue;
-                }
-
-                if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
-                    logger.error(itemResponse.getFailureMessage());
-                } else {
-                    throw new RuntimeException(errorMsg + itemResponse.getFailureMessage());
-                }
-            }
-        }
-    }
-
-    public RestHighLevelClient getRestHighLevelClient() {
-        return restHighLevelClient;
-    }
-
-    public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
-        this.restHighLevelClient = restHighLevelClient;
-    }
-
-    private HttpHost createHttpHost(String uriStr) {
-        URI uri = URI.create(uriStr);
-        if (!org.springframework.util.StringUtils.hasLength(uri.getUserInfo())) {
-            return HttpHost.create(uri.toString());
-        }
-        try {
-            return HttpHost.create(new URI(uri
-                .getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())
-                    .toString());
-        } catch (URISyntaxException ex) {
-            throw new IllegalStateException(ex);
-        }
-    }
 }

+ 39 - 3
common/src/main/java/com/alibaba/otter/canal/common/utils/FileUtils.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.common.utils;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.charset.StandardCharsets;
@@ -88,8 +89,43 @@ public class FileUtils {
         return res.toString();
     }
 
-    public static void main(String[] args) {
-        String res = readFileFromOffset("test2.txt", 2, "UTF-8");
-        System.out.println(res);
+    /**
+     * 校验自定义的文件名,是否在允许的基目录范围内,如何合法就返回全路径,否则就直接报错
+     *
+     * @param baseDir
+     * @param destination
+     * @return
+     */
+    public static String validateFileName(String baseDir, String destination) {
+        try {
+            // 验证 destination 是否在允许的基目录范围内
+            String basePath = new File(baseDir).getCanonicalPath();
+            String fullPath = new File(basePath, destination).getCanonicalPath();
+
+            // 检查 fullPath 是否以 basePath 开头
+            if (!fullPath.startsWith(basePath + File.separator)) {
+                throw new IllegalArgumentException("Invalid destination path");
+            }
+
+            return fullPath;
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to read file", e);
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        String fullPath = validateFileName("/tmp/", "1.txt");
+        System.out.println(fullPath);
+        System.out.println(org.apache.commons.io.FileUtils.readLines(new File(fullPath)));
+
+        fullPath = validateFileName("/tmp/", "test");
+        fullPath = validateFileName(fullPath,"1.txt");
+        System.out.println(fullPath);
+        System.out.println(org.apache.commons.io.FileUtils.readLines(new File(fullPath)));
+
+
+        fullPath = validateFileName("/tmp/", "../etc/hosts");
+        System.out.println(fullPath);
+        System.out.println(org.apache.commons.io.FileUtils.readLines(new File(fullPath)));
     }
 }

+ 11 - 2
deployer/src/main/java/com/alibaba/otter/canal/deployer/admin/CanalAdminController.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.deployer.admin;
 
 import java.io.File;
+import java.io.IOException;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -203,7 +204,10 @@ public class CanalAdminController implements CanalAdmin {
 
     @Override
     public String listInstanceLog(String destination) {
-        Collection<File> files = org.apache.commons.io.FileUtils.listFiles(new File("../logs/" + destination + "/"),
+        // 校验destination
+        String desPath = FileUtils.validateFileName("../logs", destination);
+
+        Collection<File> files = org.apache.commons.io.FileUtils.listFiles(new File(desPath),
             TrueFileFilter.TRUE,
             TrueFileFilter.TRUE);
         List<String> names = files.stream().map(File::getName).collect(Collectors.toList());
@@ -215,7 +219,12 @@ public class CanalAdminController implements CanalAdmin {
         if (StringUtils.isEmpty(fileName)) {
             fileName = destination + ".log";
         }
-        return FileUtils.readFileFromOffset("../logs/" + destination + "/" + fileName, lines, "UTF-8");
+
+        // 分别校验destination和fileName目录
+        String desPath = FileUtils.validateFileName("../logs", destination);
+        String fullPath = FileUtils.validateFileName(desPath , fileName);
+
+        return FileUtils.readFileFromOffset(fullPath, lines, "UTF-8");
     }
 
     private InstanceAction getInstanceAction(String destination) {

+ 22 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta_DDL_Test.java

@@ -131,4 +131,26 @@ public class MemoryTableMeta_DDL_Test {
             Assert.assertTrue(field.isUnique());
         }
     }
+
+    @Test
+    public void test_polardbx_columnar_index () throws Throwable {
+        MemoryTableMeta memoryTableMeta = new MemoryTableMeta();
+        URL url = Thread.currentThread().getContextClassLoader().getResource("dummy.txt");
+        File dummyFile = new File(url.getFile());
+        File create = new File(dummyFile.getParent() + "/ddl", "ddl_test4.sql");
+        String sql = StringUtils.join(IOUtils.readLines(new FileInputStream(create)), "\n");
+        memoryTableMeta.apply(null, "test", sql, null);
+
+        List<String> tableNames = new ArrayList<>();
+        for (Schema schema : memoryTableMeta.getRepository().getSchemas()) {
+            tableNames.addAll(schema.showTables());
+        }
+
+        for (String table : tableNames) {
+            TableMeta sourceMeta = memoryTableMeta.find("test", table);
+            TableMeta.FieldMeta field = sourceMeta.getFieldMetaByName("address");
+            System.out.println(sourceMeta.toString());
+            Assert.assertTrue(field.isUnique());
+        }
+    }
 }

+ 1 - 0
parse/src/test/resources/ddl/ddl_test4.sql

@@ -0,0 +1 @@
+CREATE TABLE `test_columnar` (
   `id` bigint NOT NULL AUTO_INCREMENT,
   `user_id` bigint NOT NULL,
   `vault_id` bigint NOT NULL,
   `address` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL DEFAULT '',
   `address_from` int NOT NULL DEFAULT '0' ,
   `alias` varchar(100) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT 'MyAddress' ,
   `balance` decimal(50, 0) NOT NULL DEFAULT '0' ,
   `coin_type` int NOT NULL DEFAULT '0' ,
   `height` bigint DEFAULT '0' ,
   `shard` smallint NOT NULL DEFAULT '0' ,
   `chain` int NOT NULL DEFAULT '0' ,
   `status` smallint NOT NULL DEFAULT '0' ,
   `account` int NOT NULL DEFAULT '0',
   `is_change` tinyint DEFAULT '0' ,
   `address_index` int DEFAULT '0' ,
   `redeem_script` varchar(512) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL,
   `nonce` bigint UNSIGNED NOT NULL DEFAULT '0',
   `is_sync` tinyint NOT NULL DEFAULT '0',
   `flag` tinyint NOT NULL DEFAULT '0',
   `wallet_type` varchar(10) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL DEFAULT '',
   `created_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
   `modify_date` timestamp NULL DEFAULT NULL ,
   `address_type` tinyint NOT NULL DEFAULT '0',
   `risk_type` int NOT NULL DEFAULT '0' ,
   `balance_source` int NOT NULL DEFAULT '0',
   `tx_flag` tinyint NOT NULL DEFAULT '0' ,
   `risk_flag` smallint NOT NULL DEFAULT '0',
   `business_line` int NOT NULL DEFAULT '1',
   PRIMARY KEY (`id`),
   CLUSTERED COLUMNAR INDEX `cc_i_address` (`address`)
       PARTITION BY HASH(`address`)
       PARTITIONS 32,
   UNIQUE KEY `idx_ct_addr` USING BTREE (`user_id`, `coin_type`, `address`)
) ENGINE = InnoDB AUTO_INCREMENT = 128498037 DEFAULT CHARSET = utf8mb3 DEFAULT COLLATE = utf8mb3_bin ROW_FORMAT = DYNAMIC
PARTITION BY KEY(`address`)
PARTITIONS 32