Browse Source

Merge branch 'master' into feature/ziyang_migrate_parser

agapple 1 year ago
parent
commit
a98549ee0d
19 changed files with 438 additions and 169 deletions
  1. 3 2
      client-adapter/es8x/src/main/java/com/alibaba/otter/canal/client/adapter/es8x/support/ES8xTemplate.java
  2. 52 2
      client-adapter/es8x/src/main/java/com/alibaba/otter/canal/client/adapter/es8x/support/ESConnection.java
  3. 2 1
      client-adapter/launcher/src/main/resources/application.yml
  4. 87 51
      client/src/main/java/com/alibaba/otter/canal/client/pulsarmq/PulsarMQCanalConnector.java
  5. 5 0
      connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java
  6. 13 0
      connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java
  7. 10 0
      connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java
  8. 10 0
      connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java
  9. 12 7
      connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java
  10. 57 60
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonConversion.java
  11. 27 1
      dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/JsonConversion_Json_ValueTest.java
  12. 3 3
      deployer/src/main/resources/logback.xml
  13. 3 2
      docker/image/alidata/bin/main.sh
  14. 17 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java
  15. 55 31
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
  16. 43 8
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java
  17. 13 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/FastsqlSchemaTest.java
  18. 25 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta_DDL_Test.java
  19. 1 0
      parse/src/test/resources/ddl/ddl_create_function_index.sql

+ 3 - 2
client-adapter/es8x/src/main/java/com/alibaba/otter/canal/client/adapter/es8x/support/ES8xTemplate.java

@@ -65,8 +65,9 @@ public class ES8xTemplate implements ESTemplate {
         if (mapping.getId() != null) {
             String parentVal = (String) esFieldData.remove("$parent_routing");
             if (mapping.isUpsert()) {
-                ESUpdateRequest updateRequest = esConnection.new ES8xUpdateRequest(mapping.getIndex(),
-                    pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
+                ESUpdateRequest updateRequest = esConnection.new ES8xUpdateRequest(mapping.getIndex(), pkVal.toString())
+                    .setDoc(esFieldData)
+                    .setDocAsUpsert(true);
                 if (StringUtils.isNotEmpty(parentVal)) {
                     updateRequest.setRouting(parentVal);
                 }

+ 52 - 2
client-adapter/es8x/src/main/java/com/alibaba/otter/canal/client/adapter/es8x/support/ESConnection.java

@@ -1,18 +1,29 @@
 package com.alibaba.otter.canal.client.adapter.es8x.support;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
 import java.util.Arrays;
 import java.util.Map;
 
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -51,6 +62,47 @@ public class ESConnection {
     private RestHighLevelClient restHighLevelClient;
 
     public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException{
+        String caPath = properties.get("security.ca.path");
+        if (StringUtils.isNotEmpty(caPath)) {
+            connectEsWithCa(hosts, properties, caPath);
+        } else {
+            connectEsWithoutCa(hosts, properties);
+        }
+    }
+
+    private void connectEsWithCa(String[] hosts, Map<String, String> properties, String caPath) {
+        Path caCertificatePath = Paths.get(caPath);
+        try (InputStream is = Files.newInputStream(caCertificatePath)) {
+            CertificateFactory factory = CertificateFactory.getInstance("X.509");
+            Certificate trustedCa = factory.generateCertificate(is);
+            KeyStore trustStore = KeyStore.getInstance("pkcs12");
+            trustStore.load(null, null);
+            trustStore.setCertificateEntry("ca", trustedCa);
+            SSLContextBuilder sslContextBuilder = SSLContexts.custom().loadTrustMaterial(trustStore, null);
+            final SSLContext sslContext = sslContextBuilder.build();
+
+            HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
+            RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
+            String nameAndPwd = properties.get("security.auth");
+            if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
+                String[] nameAndPwdArr = nameAndPwd.split(":");
+                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+                credentialsProvider.setCredentials(AuthScope.ANY,
+                    new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
+                restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
+                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                    return httpClientBuilder.setSSLContext(sslContext);
+                });
+            }
+            restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build())
+                .setApiCompatibilityMode(true)
+                .build();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void connectEsWithoutCa(String[] hosts, Map<String, String> properties) {
         HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
         RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
         String nameAndPwd = properties.get("security.auth");
@@ -280,9 +332,7 @@ public class ESConnection {
         private BulkRequest        bulkRequest;
 
         public ES8xBulkRequest(){
-
             bulkRequest = new BulkRequest();
-
         }
 
         public void resetBulk() {

+ 2 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -91,6 +91,7 @@ canal.conf:
 #        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
 #        properties:
 #          mode: transport # or rest
+#          # security.ca.path: /etc/es8/ca.crt
 #          # security.auth: test:123456 #  only used for rest mode
 #          cluster.name: elasticsearch
 #      - name: kudu
@@ -113,4 +114,4 @@ canal.conf:
 #          jdbc.password: 123456
 #          batchSize: 3000
 #          scheduleTime: 600   # second unit
-#          threads: 3          # parallel threads
+#          threads: 3          # parallel threads

+ 87 - 51
client/src/main/java/com/alibaba/otter/canal/client/pulsarmq/PulsarMQCanalConnector.java

@@ -25,91 +25,98 @@ import com.google.common.collect.Lists;
  * 注意点:
  * 1. 相比于canal {@linkplain SimpleCanalConnector}, 这里get和ack操作不能有并发, 必须是一个线程执行get后,内存里执行完毕ack后再取下一个get
  * </pre>
+ * 
  * todo 重复消费的概率相当高。一次批处理中,只要有一个消息处理失败,则该批次全部重试
  *
  * @since 1.1.1
  */
 public class PulsarMQCanalConnector implements CanalMQConnector {
 
-    private static final Logger logger = LoggerFactory.getLogger(PulsarMQCanalConnector.class);
+    private static final Logger       logger                     = LoggerFactory
+        .getLogger(PulsarMQCanalConnector.class);
 
     private volatile Messages<byte[]> lastGetBatchMessage;
 
     /**
      * 连接pulsar客户端
      */
-    private PulsarClient pulsarClient;
+    private PulsarClient              pulsarClient;
     /**
      * 消费者
      */
-    private Consumer<byte[]> consumer;
+    private Consumer<byte[]>          consumer;
     /**
      * 是否扁平化Canal消息内容
      */
-    private boolean isFlatMessage = false;
+    private boolean                   isFlatMessage              = false;
     /**
      * 主题名称
      */
-    private String topic;
+    private String                    topic;
     /**
      * 环境连接URL
      */
-    private String serviceUrl;
+    private String                    serviceUrl;
     /**
      * 角色认证token
      */
-    private String roleToken;
+    private String                    roleToken;
+
+    /**
+     * listener name
+     */
+    private String                    listenerName;
+
     /**
      * 订阅客户端名称
      */
-    private String subscriptName;
+    private String                    subscriptName;
     /**
      * 每次批量获取数据的最大条目数,默认30
      */
-    private int batchSize = 30;
+    private int                       batchSize                  = 30;
     /**
-     * 与{@code batchSize}一起决定批量获取的数据大小
-     * 当:
+     * 与{@code batchSize}一起决定批量获取的数据大小 当:
      * <p>
      * 1. {@code batchSize} 条消息未消费时<br/>
      * 2. 距上一次批量消费时间达到{@code batchTimeoutSeconds}秒时
      * </p>
      * 任一条件满足,即执行批量消费
      */
-    private int getBatchTimeoutSeconds = 30;
+    private int                       getBatchTimeoutSeconds     = 30;
     /**
      * 批量处理消息时,一次批量处理的超时时间秒数
      * <p>
      * 该时间应该根据{@code batchSize}和{@code batchTimeoutSeconds}合理设置
      * </p>
      */
-    private int batchProcessTimeoutSeconds = 60;
+    private int                       batchProcessTimeoutSeconds = 60;
     /**
      * 消费失败后的重试秒数,默认60秒
      */
-    private int redeliveryDelaySeconds = 60;
+    private int                       redeliveryDelaySeconds     = 60;
     /**
      * 当客户端接收到消息,30秒还没有返回ack给服务端时,ack超时,会重新消费该消息
      */
-    private int ackTimeoutSeconds = 30;
+    private int                       ackTimeoutSeconds          = 30;
     /**
      * 是否开启消息失败重试功能,默认开启
      */
-    private boolean isRetry = true;
+    private boolean                   isRetry                    = true;
     /**
      * <p>
      * true重试(-RETRY)和死信队列(-DLQ)后缀为大写,有些地方创建的为小写,需确保正确
      * </p>
      */
-    private boolean isRetryDLQUpperCase = false;
+    private boolean                   isRetryDLQUpperCase        = false;
     /**
      * 最大重试次数
      */
-    private int maxRedeliveryCount = 128;
+    private int                       maxRedeliveryCount         = 128;
     /**
      * 连接标识位,在连接或关闭连接后改变值
      */
-    private boolean connected = false;
+    private boolean                   connected                  = false;
 
     /**
      * 除必要参数外,其他参数使用默认值
@@ -119,16 +126,21 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
      * </p>
      *
      * @param isFlatMessage true使用扁平消息
-     * @param serviceUrl    pulsar服务连接地址,通常为:pulsar:host:ip或http://host:ip
-     * @param roleToken     有对应topic的消费者权限的角色token
-     * @param topic         订阅主题
+     * @param serviceUrl pulsar服务连接地址,通常为:pulsar:host:ip或http://host:ip
+     * @param roleToken 有对应topic的消费者权限的角色token
+     * @param topic 订阅主题
      * @param subscriptName 订阅和客户端名称,同一个订阅名视为同一个消费实例
      * @date 2021/9/18 08:54
      * @author chad
      * @since 1 by chad at 2021/9/18 完善
      */
-    public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
-            , String subscriptName) {
+    public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic,
+                                  String subscriptName){
+        this(isFlatMessage, serviceUrl, roleToken, topic, subscriptName, null);
+    }
+
+    public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic,
+                                  String subscriptName, String listenerName){
         this.isFlatMessage = isFlatMessage;
         this.serviceUrl = serviceUrl;
         this.roleToken = roleToken;
@@ -137,6 +149,7 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
         if (StringUtils.isEmpty(this.subscriptName)) {
             throw new RuntimeException("Pulsar Consumer subscriptName required");
         }
+        this.listenerName = listenerName;
     }
 
     /**
@@ -146,10 +159,31 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
      * @author chad
      * @since 1 by chad at 2021/9/18 完善
      */
-    public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
-            , String subscriptName, int batchSize, int getBatchTimeoutSeconds, int batchProcessTimeoutSeconds
-            , int redeliveryDelaySeconds, int ackTimeoutSeconds, boolean isRetry, boolean isRetryDLQUpperCase
-            , int maxRedeliveryCount) {
+    public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic,
+                                  String subscriptName, int batchSize, int getBatchTimeoutSeconds,
+                                  int batchProcessTimeoutSeconds, int redeliveryDelaySeconds, int ackTimeoutSeconds,
+                                  boolean isRetry, boolean isRetryDLQUpperCase, int maxRedeliveryCount){
+        this(isFlatMessage,
+            serviceUrl,
+            roleToken,
+            topic,
+            subscriptName,
+            batchSize,
+            getBatchTimeoutSeconds,
+            batchProcessTimeoutSeconds,
+            redeliveryDelaySeconds,
+            ackTimeoutSeconds,
+            isRetry,
+            isRetryDLQUpperCase,
+            maxRedeliveryCount,
+            null);
+    }
+
+    public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic,
+                                  String subscriptName, int batchSize, int getBatchTimeoutSeconds,
+                                  int batchProcessTimeoutSeconds, int redeliveryDelaySeconds, int ackTimeoutSeconds,
+                                  boolean isRetry, boolean isRetryDLQUpperCase, int maxRedeliveryCount,
+                                  String listenerName){
         this.isFlatMessage = isFlatMessage;
         this.serviceUrl = serviceUrl;
         this.roleToken = roleToken;
@@ -166,16 +200,20 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
         this.isRetry = isRetry;
         this.isRetryDLQUpperCase = isRetryDLQUpperCase;
         this.maxRedeliveryCount = maxRedeliveryCount;
+        this.listenerName = listenerName;
     }
 
     @Override
     public void connect() throws CanalClientException {
         // 连接创建客户端
         try {
-            pulsarClient = PulsarClient.builder()
-                    .serviceUrl(serviceUrl)
-                    .authentication(AuthenticationFactory.token(roleToken))
-                    .build();
+            ClientBuilder builder = PulsarClient.builder()
+                .serviceUrl(serviceUrl)
+                .authentication(AuthenticationFactory.token(roleToken));
+            if (StringUtils.isNotEmpty(listenerName)) {
+                builder.listenerName(listenerName);
+            }
+            pulsarClient = builder.build();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
@@ -224,15 +262,14 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
         builder.subscriptionType(SubscriptionType.Failover);
 
         builder
-                // 调用consumer.negativeAcknowledge(message) (即nack)来表示消费失败的消息
-                // 在指定的时间进行重新消费,默认是1分钟。
-                .negativeAckRedeliveryDelay(this.redeliveryDelaySeconds, TimeUnit.SECONDS)
-                .subscriptionName(this.subscriptName)
-        ;
+            // 调用consumer.negativeAcknowledge(message) (即nack)来表示消费失败的消息
+            // 在指定的时间进行重新消费,默认是1分钟。
+            .negativeAckRedeliveryDelay(this.redeliveryDelaySeconds, TimeUnit.SECONDS)
+            .subscriptionName(this.subscriptName);
         if (this.isRetry) {
             DeadLetterPolicy.DeadLetterPolicyBuilder dlqBuilder = DeadLetterPolicy.builder()
-                    // 最大重试次数
-                    .maxRedeliverCount(this.maxRedeliveryCount);
+                // 最大重试次数
+                .maxRedeliverCount(this.maxRedeliveryCount);
             // 指定重试队列,不是多个或通配符topic才能判断重试队列
             if (!MQUtil.isPatternTag(this.topic)) {
                 String retryTopic = this.topic + (this.isRetryDLQUpperCase ? "-RETRY" : "-retry");
@@ -241,19 +278,17 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
                 dlqBuilder.deadLetterTopic(dlqTopic);
             }
 
-            //默认关闭,如果需要重试则开启
-            builder.enableRetry(true)
-                    .deadLetterPolicy(dlqBuilder.build());
+            // 默认关闭,如果需要重试则开启
+            builder.enableRetry(true).deadLetterPolicy(dlqBuilder.build());
         }
 
         // ack超时
         builder.ackTimeout(this.ackTimeoutSeconds, TimeUnit.SECONDS);
 
         // pulsar批量获取消息设置
-        builder.batchReceivePolicy(new BatchReceivePolicy.Builder()
-                .maxNumMessages(this.batchSize)
-                .timeout(this.getBatchTimeoutSeconds, TimeUnit.SECONDS)
-                .build());
+        builder.batchReceivePolicy(new BatchReceivePolicy.Builder().maxNumMessages(this.batchSize)
+            .timeout(this.getBatchTimeoutSeconds, TimeUnit.SECONDS)
+            .build());
 
         try {
             this.consumer = builder.subscribe();
@@ -263,7 +298,6 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
         }
     }
 
-
     @Override
     public void subscribe() throws CanalClientException {
         this.subscribe(null);
@@ -284,7 +318,7 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
      * 不关注业务执行结果,只要收到消息即认识消费成功,自动ack
      *
      * @param timeout 阻塞获取消息的超时时间
-     * @param unit    时间单位
+     * @param unit 时间单位
      * @return java.util.List<com.alibaba.otter.canal.protocol.Message>
      * @date 2021/9/13 22:24
      * @author chad
@@ -300,13 +334,14 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
     }
 
     /**
-     * 关心业务执行结果,业务侧根据执行结果调用 {@link PulsarMQCanalConnector#ack()}或{@link PulsarMQCanalConnector#rollback()}
+     * 关心业务执行结果,业务侧根据执行结果调用
+     * {@link PulsarMQCanalConnector#ack()}或{@link PulsarMQCanalConnector#rollback()}
      * <p>
      * 本方法示支持多线程,在MQ保障顺序的前提下,也无法提供单Topic多线程
      * </p>
      *
      * @param timeout 阻塞获取消息的超时时间
-     * @param unit    时间单位
+     * @param unit 时间单位
      * @return java.util.List<com.alibaba.otter.canal.protocol.Message>
      * @date 2021/9/13 22:26
      * @author chad
@@ -340,7 +375,8 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
      * @return java.util.List<T>
      * @date 2021/9/14 15:20
      * @author chad
-     * @since 1 by chad at 2021/9/14 供{@link PulsarMQCanalConnector#getListWithoutAck(Long, TimeUnit)}
+     * @since 1 by chad at 2021/9/14
+     * 供{@link PulsarMQCanalConnector#getListWithoutAck(Long, TimeUnit)}
      * 和{@link PulsarMQCanalConnector#getFlatListWithoutAck(Long, TimeUnit)}调用
      */
     private <T> List<T> getListWithoutAck() {

+ 5 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java

@@ -61,4 +61,9 @@ public class PulsarMQConstants {
      */
     public static final String PULSARMQ_ADMIN_SERVER_URL          = ROOT + "." + "adminServerUrl";
 
+    /**
+     * Pulsar admin服务器地址
+     */
+    public static final String PULSARMQ_LISTENER_NAME             = ROOT + "." + "listenerName";
+
 }

+ 13 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java

@@ -37,6 +37,11 @@ public class PulsarMQProducerConfig extends MQProperties {
      */
     private String adminServerUrl;
 
+    /**
+     * listener name
+     */
+    private String listenerName;
+
     public String getServerUrl() {
         return serverUrl;
     }
@@ -68,4 +73,12 @@ public class PulsarMQProducerConfig extends MQProperties {
     public void setAdminServerUrl(String adminServerUrl) {
         this.adminServerUrl = adminServerUrl;
     }
+
+    public String getListenerName() {
+        return listenerName;
+    }
+
+    public void setListenerName(String listenerName) {
+        this.listenerName = listenerName;
+    }
 }

+ 10 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java

@@ -54,6 +54,12 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer {
      * 角色认证token
      */
     private String                    roleToken;
+
+    /**
+     * listener name
+     */
+    private String                    listenerName;
+
     /**
      * 订阅客户端名称
      */
@@ -110,6 +116,7 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer {
         }
         this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL);
         this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
+        this.listenerName = properties.getProperty(PulsarMQConstants.PULSARMQ_LISTENER_NAME);
         this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
         // 采用groupId作为subscriptName,避免所有的都是同一个订阅者名称
         if (StringUtils.isEmpty(this.subscriptName)) {
@@ -165,6 +172,9 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer {
             if (StringUtils.isNotEmpty(roleToken)) {
                 builder.authentication(AuthenticationFactory.token(roleToken));
             }
+            if (StringUtils.isNotEmpty(listenerName)) {
+                builder.authentication(AuthenticationFactory.token(listenerName));
+            }
             pulsarClient = builder.build();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);

+ 10 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java

@@ -73,6 +73,11 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
                 // 角色权限认证的token
                 builder.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()));
             }
+            if (StringUtils.isNotEmpty(pulsarMQProducerConfig.getListenerName())) {
+                // listener name
+                builder.listenerName(pulsarMQProducerConfig.getListenerName());
+            }
+
             client = builder.build();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
@@ -127,6 +132,11 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
         if (!StringUtils.isEmpty(adminServerUrl)) {
             tmpProperties.setAdminServerUrl(adminServerUrl);
         }
+        String listenerName = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_LISTENER_NAME);
+        if (!StringUtils.isEmpty(listenerName)) {
+            tmpProperties.setListenerName(listenerName);
+        }
+
         if (logger.isDebugEnabled()) {
             logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
         }

+ 12 - 7
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java

@@ -82,13 +82,18 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
         try {
             connect = factory.newConnection();
             channel = connect.createChannel();
-            channel.queueDeclare(rabbitMQProperties.getQueue(), true, false, false, null);
-            channel.exchangeDeclare(rabbitMQProperties
-                .getExchange(), rabbitMQProperties.getDeliveryMode(), true, false, false, null);
-            channel.queueBind(rabbitMQProperties.getQueue(),
-                rabbitMQProperties.getExchange(),
-                rabbitMQProperties.getRoutingKey());
-
+            String queue = rabbitMQProperties.getQueue();
+            String exchange = rabbitMQProperties.getExchange();
+            String deliveryMode = rabbitMQProperties.getDeliveryMode();
+            String routingKey = rabbitMQProperties.getRoutingKey();
+            if (!StringUtils.isEmpty(queue)) {
+                channel.queueDeclare(queue, true, false, false, null);
+            }
+            if (!StringUtils.isEmpty(queue) && !StringUtils.isEmpty(exchange) && !StringUtils.isEmpty(deliveryMode)
+                && !StringUtils.isEmpty(routingKey)) {
+                channel.exchangeDeclare(exchange, deliveryMode, true, false, false, null);
+                channel.queueBind(queue, exchange, routingKey);
+            }
         } catch (IOException | TimeoutException ex) {
             throw new CanalException("Start RabbitMQ producer error", ex);
         }

+ 57 - 60
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonConversion.java

@@ -1,10 +1,8 @@
 package com.taobao.tddl.dbsync.binlog;
 
-import java.nio.charset.Charset;
+import static com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer.*;
 
-import static com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer.appendNumber2;
-import static com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer.appendNumber4;
-import static com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer.usecondsToStr;
+import java.nio.charset.Charset;
 
 /**
  * 处理下MySQL json二进制转化为可读的字符串
@@ -34,26 +32,25 @@ public class JsonConversion {
     public static final char JSONB_FALSE_LITERAL     = '\2';
 
     /*
-     * The size of offset or size fields in the small and the large storage
-     * format for JSON objects and JSON arrays.
+     * The size of offset or size fields in the small and the large storage format
+     * for JSON objects and JSON arrays.
      */
     public static final int  SMALL_OFFSET_SIZE       = 2;
     public static final int  LARGE_OFFSET_SIZE       = 4;
 
     /*
-     * The size of key entries for objects when using the small storage format
-     * or the large storage format. In the small format it is 4 bytes (2 bytes
-     * for key length and 2 bytes for key offset). In the large format it is 6
-     * (2 bytes for length, 4 bytes for offset).
+     * The size of key entries for objects when using the small storage format or
+     * the large storage format. In the small format it is 4 bytes (2 bytes for key
+     * length and 2 bytes for key offset). In the large format it is 6 (2 bytes for
+     * length, 4 bytes for offset).
      */
     public static final int  KEY_ENTRY_SIZE_SMALL    = (2 + SMALL_OFFSET_SIZE);
     public static final int  KEY_ENTRY_SIZE_LARGE    = (2 + LARGE_OFFSET_SIZE);
 
     /*
-     * The size of value entries for objects or arrays. When using the small
-     * storage format, the entry size is 3 (1 byte for type, 2 bytes for
-     * offset). When using the large storage format, it is 5 (1 byte for type, 4
-     * bytes for offset).
+     * The size of value entries for objects or arrays. When using the small storage
+     * format, the entry size is 3 (1 byte for type, 2 bytes for offset). When using
+     * the large storage format, it is 5 (1 byte for type, 4 bytes for offset).
      */
     public static final int  VALUE_ENTRY_SIZE_SMALL  = (1 + SMALL_OFFSET_SIZE);
     public static final int  VALUE_ENTRY_SIZE_LARGE  = (1 + LARGE_OFFSET_SIZE);
@@ -160,13 +157,12 @@ public class JsonConversion {
                 if (len < n + str_len) {
                     throw new IllegalArgumentException("illegal json data");
                 }
-                return new Json_Value(Json_enum_type.STRING, buffer.rewind()
-                    .forward((int) n)
-                    .getFixString((int) str_len, charset));
+                return new Json_Value(Json_enum_type.STRING,
+                    buffer.rewind().forward((int) n).getFixString((int) str_len, charset));
             case JSONB_TYPE_OPAQUE:
                 /*
-                 * There should always be at least one byte, which tells the
-                 * field type of the opaque value.
+                 * There should always be at least one byte, which tells the field type of the
+                 * opaque value.
                  */
                 // The type is encoded as a uint8 that maps to an
                 // enum_field_types.
@@ -291,7 +287,7 @@ public class JsonConversion {
                         if (i > 0) {
                             buf.append(", ");
                         }
-                        buf.append('"').append(key(i, charset)).append('"');
+                        buf.append('"').append(escapse(key(i, charset))).append('"');
                         buf.append(": ");
                         element(i, charset).toJsonString(buf, charset);
                     }
@@ -368,45 +364,45 @@ public class JsonConversion {
                         buf.append('"').append(text).append('"');
                     } else if (m_field_type == LogEvent.MYSQL_TYPE_DATE || m_field_type == LogEvent.MYSQL_TYPE_DATETIME
                                || m_field_type == LogEvent.MYSQL_TYPE_TIMESTAMP) {
-                        long packed_value = m_data.getLong64();
-                        if (packed_value == 0) {
-                            text = "0000-00-00 00:00:00";
-                        } else {
-                            // 构造TimeStamp只处理到秒
-                            long ultime = Math.abs(packed_value);
-                            long intpart = ultime >> 24;
-                            int frac = (int) (ultime % (1L << 24));
-                            long ymd = intpart >> 17;
-                            long ym = ymd >> 5;
-                            long hms = intpart % (1 << 17);
-                            // text =
-                            // String.format("%04d-%02d-%02d %02d:%02d:%02d",
-                            // (int) (ym / 13),
-                            // (int) (ym % 13),
-                            // (int) (ymd % (1 << 5)),
-                            // (int) (hms >> 12),
-                            // (int) ((hms >> 6) % (1 << 6)),
-                            // (int) (hms % (1 << 6)));
-                            StringBuilder builder = new StringBuilder(26);
-                            appendNumber4(builder, (int) (ym / 13));
-                            builder.append('-');
-                            appendNumber2(builder, (int) (ym % 13));
-                            builder.append('-');
-                            appendNumber2(builder, (int) (ymd % (1 << 5)));
-                            builder.append(' ');
-                            appendNumber2(builder, (int) (hms >> 12));
-                            builder.append(':');
-                            appendNumber2(builder, (int) ((hms >> 6) % (1 << 6)));
-                            builder.append(':');
-                            appendNumber2(builder, (int) (hms % (1 << 6)));
-                            builder.append('.').append(usecondsToStr(frac, 6));
-                            text = builder.toString();
-                        }
-                        buf.append('"').append(text).append('"');
-                    } else {
-                        text = m_data.getFixString((int) m_length, charset);
-                        buf.append('"').append(escapse(text)).append('"');
-                    }
+                                   long packed_value = m_data.getLong64();
+                                   if (packed_value == 0) {
+                                       text = "0000-00-00 00:00:00";
+                                   } else {
+                                       // 构造TimeStamp只处理到秒
+                                       long ultime = Math.abs(packed_value);
+                                       long intpart = ultime >> 24;
+                                       int frac = (int) (ultime % (1L << 24));
+                                       long ymd = intpart >> 17;
+                                       long ym = ymd >> 5;
+                                       long hms = intpart % (1 << 17);
+                                       // text =
+                                       // String.format("%04d-%02d-%02d %02d:%02d:%02d",
+                                       // (int) (ym / 13),
+                                       // (int) (ym % 13),
+                                       // (int) (ymd % (1 << 5)),
+                                       // (int) (hms >> 12),
+                                       // (int) ((hms >> 6) % (1 << 6)),
+                                       // (int) (hms % (1 << 6)));
+                                       StringBuilder builder = new StringBuilder(26);
+                                       appendNumber4(builder, (int) (ym / 13));
+                                       builder.append('-');
+                                       appendNumber2(builder, (int) (ym % 13));
+                                       builder.append('-');
+                                       appendNumber2(builder, (int) (ymd % (1 << 5)));
+                                       builder.append(' ');
+                                       appendNumber2(builder, (int) (hms >> 12));
+                                       builder.append(':');
+                                       appendNumber2(builder, (int) ((hms >> 6) % (1 << 6)));
+                                       builder.append(':');
+                                       appendNumber2(builder, (int) (hms % (1 << 6)));
+                                       builder.append('.').append(usecondsToStr(frac, 6));
+                                       text = builder.toString();
+                                   }
+                                   buf.append('"').append(text).append('"');
+                               } else {
+                                   text = m_data.getFixString((int) m_length, charset);
+                                   buf.append('"').append(escapse(text)).append('"');
+                               }
 
                     break;
                 case STRING:
@@ -452,7 +448,8 @@ public class JsonConversion {
     }
 
     public static enum Json_enum_type {
-        OBJECT, ARRAY, STRING, INT, UINT, DOUBLE, LITERAL_NULL, LITERAL_TRUE, LITERAL_FALSE, OPAQUE, ERROR
+                                       OBJECT, ARRAY, STRING, INT, UINT, DOUBLE, LITERAL_NULL, LITERAL_TRUE,
+                                       LITERAL_FALSE, OPAQUE, ERROR
     }
 
 }

+ 27 - 1
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/JsonConversion_Json_ValueTest.java

@@ -1,5 +1,7 @@
 package com.taobao.tddl.dbsync.binlog;
 
+import java.nio.charset.Charset;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -48,7 +50,31 @@ public class JsonConversion_Json_ValueTest {
         // Act
         thrown.expect(IllegalArgumentException.class);
         JsonConversion.parse_value(type, buffer, len, charsetName);
-
         // Method is not expected to return due to exception thrown
     }
+
+    // test for 5127
+    @Test
+    public void testJsonKeyContainsSpecialCharacter() {
+
+        // {"internal_uri_rewrite": {"(.*)(/[^/]+\\.(mp|MP)4)$": "$1/mp4$2"}}
+        String jsonData = "{\"internal_uri_rewrite\": {\"(.*)(/[^/]+\\\\.(mp|MP)4)$\": \"$1/mp4$2\"}}";
+        byte[] data = new byte[] { 1, 0, 74, 0, 11, 0, 20, 0, 0, 31, 0, 105, 110, 116, 101, 114, 110, 97, 108, 95, 117,
+                                   114, 105, 95, 114, 101, 119, 114, 105, 116, 101, 1, 0, 43, 0, 11, 0, 23, 0, 12, 34,
+                                   0, 40, 46, 42, 41, 40, 47, 91, 94, 47, 93, 43, 92, 46, 40, 109, 112, 124, 77, 80, 41,
+                                   52, 41, 36, 8, 36, 49, 47, 109, 112, 52, 36, 50 };
+        final LogBuffer buffer = new LogBuffer(data, 0, 74);
+
+        Charset charset = Charset.forName("UTF-8");
+        Json_Value jsonValue = JsonConversion.parse_value(0, buffer, 74, charset);
+        StringBuilder builder = new StringBuilder();
+        jsonValue.toJsonString(builder, charset);
+
+        Assert.assertEquals(builder.toString(), jsonData);
+
+        Assert.assertEquals(jsonValue.key(0, charset), "internal_uri_rewrite");
+
+        Json_Value element = jsonValue.element(0, charset);
+        Assert.assertEquals(element.key(0, charset), "(.*)(/[^/]+\\.(mp|MP)4)$");
+    }
 }

+ 3 - 3
deployer/src/main/resources/logback.xml

@@ -23,7 +23,7 @@
 						<!-- or whenever the file size reaches 100MB -->
 						<maxFileSize>512MB</maxFileSize>
 					</timeBasedFileNamingAndTriggeringPolicy>
-					<maxHistory>60</maxHistory>
+					<maxHistory>14</maxHistory>
 				</rollingPolicy>
 				<encoder>
 					<pattern>
@@ -50,7 +50,7 @@
 						<!-- or whenever the file size reaches 100MB -->
 						<maxFileSize>32MB</maxFileSize>
 					</timeBasedFileNamingAndTriggeringPolicy>
-					<maxHistory>60</maxHistory>
+					<maxHistory>14</maxHistory>
 				</rollingPolicy>
 				<encoder>
 					<pattern>
@@ -71,7 +71,7 @@
 				<!-- or whenever the file size reaches 100MB -->
 				<maxFileSize>512MB</maxFileSize>
 			</timeBasedFileNamingAndTriggeringPolicy>
-			<maxHistory>60</maxHistory>
+			<maxHistory>14</maxHistory>
 		</rollingPolicy>
 		<encoder charset="UTF-8">
 			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n</pattern>

+ 3 - 2
docker/image/alidata/bin/main.sh

@@ -12,8 +12,9 @@ for e in $(ls /alidata/init/*) ; do
 done
 
 echo "==> INIT DEFAULT"
-systemctl start sshd
-systemctl start crond
+# disable systemctl
+# systemctl start sshd
+# systemctl start crond
 
 #echo "check hostname -i: `hostname -i`"
 #hti_num=`hostname -i|awk '{print NF}'`

+ 17 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java

@@ -24,7 +24,7 @@ public class TableMeta {
     private String          schema;
     private String          table;
     private List<FieldMeta> fields = new ArrayList<>();
-    private String          ddl;                                          // 表结构的DDL语句
+    private String          ddl;                       // 表结构的DDL语句
 
     public TableMeta(){
 
@@ -74,6 +74,22 @@ public class TableMeta {
         throw new RuntimeException("unknow column : " + name);
     }
 
+    /**
+     * 尝试基于列名查找一下Filed信息,找不到时返回为null
+     * 
+     * @param name
+     * @return
+     */
+    public FieldMeta tryGetFieldMetaByName(String name) {
+        for (FieldMeta meta : fields) {
+            if (meta.getColumnName().equalsIgnoreCase(name)) {
+                return meta;
+            }
+        }
+
+        return null;
+    }
+
     public List<FieldMeta> getPrimaryFields() {
         List<FieldMeta> primarys = new ArrayList<>();
         for (FieldMeta meta : fields) {

+ 55 - 31
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -13,7 +13,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
-
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
 import com.alibaba.polardbx.druid.sql.repository.Schema;
 import org.apache.commons.beanutils.BeanUtils;
@@ -26,6 +25,7 @@ import org.slf4j.MDC;
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONObject;
 import com.alibaba.otter.canal.filter.CanalEventFilter;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.TableMeta;
@@ -247,30 +247,47 @@ public class DatabaseTableMeta implements TableMetaTSDB {
     }
 
     private boolean applyHistoryToDB(EntryPosition position, String schema, String ddl, String extra) {
-        Map<String, String> content = new HashMap<>();
-        content.put("destination", destination);
-        content.put("binlogFile", position.getJournalName());
-        content.put("binlogOffest", String.valueOf(position.getPosition()));
-        content.put("binlogMasterId", String.valueOf(position.getServerId()));
-        content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
-        content.put("useSchema", schema);
-        if (content.isEmpty()) {
-            throw new RuntimeException("apply failed caused by content is empty in applyHistoryToDB");
-        }
-        // 待补充
+        // Map<String, String> content = new HashMap<>();
+        // content.put("destination", destination);
+        // content.put("binlogFile", position.getJournalName());
+        // content.put("binlogOffest", String.valueOf(position.getPosition()));
+        // content.put("binlogMasterId", String.valueOf(position.getServerId()));
+        // content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
+        // content.put("useSchema", schema);
+        //
+        // if (content.isEmpty()) {
+        // throw new RuntimeException("apply failed caused by content is empty in
+        // applyHistoryToDB");
+        // }
+        // // 待补充
+        // List<DdlResult> ddlResults = DruidDdlParser.parse(ddl, schema);
+        // if (ddlResults.size() > 0) {
+        // DdlResult ddlResult = ddlResults.get(0);
+        // content.put("sqlSchema", ddlResult.getSchemaName());
+        // content.put("sqlTable", ddlResult.getTableName());
+        // content.put("sqlType", ddlResult.getType().name());
+        // content.put("sqlText", ddl);
+        // content.put("extra", extra);
+        // }
+        // BeanUtils.populate(metaDO, content);
+
+        MetaHistoryDO metaDO = new MetaHistoryDO();
+        metaDO.setDestination(destination);
+        metaDO.setBinlogFile(position.getJournalName());
+        metaDO.setBinlogOffest(position.getPosition());
+        metaDO.setBinlogMasterId(String.valueOf(position.getServerId()));
+        metaDO.setBinlogTimestamp(position.getTimestamp());
+        metaDO.setUseSchema(schema);
         List<DdlResult> ddlResults = DruidDdlParser.parse(ddl, schema);
         if (ddlResults.size() > 0) {
             DdlResult ddlResult = ddlResults.get(0);
-            content.put("sqlSchema", ddlResult.getSchemaName());
-            content.put("sqlTable", ddlResult.getTableName());
-            content.put("sqlType", ddlResult.getType().name());
-            content.put("sqlText", ddl);
-            content.put("extra", extra);
+            metaDO.setSqlSchema(ddlResult.getSchemaName());
+            metaDO.setSqlTable(ddlResult.getTableName());
+            metaDO.setSqlType(ddlResult.getType().name());
+            metaDO.setSqlText(ddl);
+            metaDO.setExtra(extra);
         }
-
-        MetaHistoryDO metaDO = new MetaHistoryDO();
         try {
-            BeanUtils.populate(metaDO, content);
             // 会建立唯一约束,解决:
             // 1. 重复的binlog file+offest
             // 2. 重复的masterId+timestamp
@@ -328,20 +345,27 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         }
 
         if (compareAll) {
-            Map<String, String> content = new HashMap<>();
-            content.put("destination", destination);
-            content.put("binlogFile", position.getJournalName());
-            content.put("binlogOffest", String.valueOf(position.getPosition()));
-            content.put("binlogMasterId", String.valueOf(position.getServerId()));
-            content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
-            content.put("data", JSON.toJSONString(schemaDdls));
-            if (content.isEmpty()) {
-                throw new RuntimeException("apply failed caused by content is empty in applySnapshotToDB");
-            }
+            // Map<String, String> content = new HashMap<>();
+            // content.put("destination", destination);
+            // content.put("binlogFile", position.getJournalName());
+            // content.put("binlogOffest", String.valueOf(position.getPosition()));
+            // content.put("binlogMasterId", String.valueOf(position.getServerId()));
+            // content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
+            // content.put("data", JSON.toJSONString(schemaDdls));
+            // if (content.isEmpty()) {
+            // throw new RuntimeException("apply failed caused by content is empty in
+            // applySnapshotToDB");
+            // }
+            // BeanUtils.populate(snapshotDO, content);
 
             MetaSnapshotDO snapshotDO = new MetaSnapshotDO();
+            snapshotDO.setDestination(destination);
+            snapshotDO.setBinlogFile(position.getJournalName());
+            snapshotDO.setBinlogOffest(position.getPosition());
+            snapshotDO.setBinlogMasterId(String.valueOf(position.getServerId()));
+            snapshotDO.setBinlogTimestamp(position.getTimestamp());
+            snapshotDO.setExtra(JSON.toJSONString(schemaDdls));
             try {
-                BeanUtils.populate(snapshotDO, content);
                 metaSnapshotDAO.insert(snapshotDO);
             } catch (Throwable e) {
                 if (isUkDuplicateException(e)) {

+ 43 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -1,9 +1,6 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.alibaba.polardbx.druid.DbType;
@@ -38,7 +35,6 @@ import com.alibaba.polardbx.druid.util.JdbcConstants;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.alibaba.otter.canal.parse.inbound.TableMeta;
 import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;
 import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DruidDdlParser;
@@ -272,14 +268,53 @@ public class MemoryTableMeta implements TableMetaTSDB {
         } else if (element instanceof MySqlUnique) {
             MySqlUnique column = (MySqlUnique) element;
             List<SQLSelectOrderByItem> uks = column.getColumns();
+            // https://github.com/alibaba/canal/issues/5094
+            // 处理一下函数索引
+            List<String> columnNames = new ArrayList<String>();
             for (SQLSelectOrderByItem uk : uks) {
-                String name = getSqlName(uk.getExpr());
-                FieldMeta field = tableMeta.getFieldMetaByName(name);
-                field.setUnique(true);
+                SQLExpr sqlName = uk.getExpr();
+                columnNames.addAll(getIndexColumnNames(sqlName));
+            }
+            // uniqe打标
+            for (String name : columnNames) {
+                FieldMeta field = tableMeta.tryGetFieldMetaByName(name);
+                if (field != null) {
+                    field.setUnique(true);
+                }
             }
         }
     }
 
+    private List<String> getIndexColumnNames(SQLExpr expr) {
+        if (expr instanceof SQLMethodInvokeExpr) {
+            // 需要递归处理下函数索引, 尽可能收集一下列名
+            // 常见的case:
+            // 1. upper(col)
+            // 2. left(upper(col) , 10)
+            // 3. col(10)
+            List<SQLExpr> indexExpres = ((SQLMethodInvokeExpr) expr).getArguments();
+            List<String> columnNames = new ArrayList<String>();
+            // 加上当前列
+            columnNames.add(getSqlName(expr));
+            // 处理函数索引列
+            for (SQLExpr exArgs : indexExpres) {
+                if (exArgs instanceof SQLMethodInvokeExpr) {
+                    // 加上当前列
+                    columnNames.add(getSqlName(exArgs));
+                    columnNames.addAll(getIndexColumnNames(exArgs));
+                } else {
+                    String columnName = getSqlName(exArgs);
+                    columnNames.add(columnName);
+                }
+            }
+
+            return columnNames;
+        } else {
+            String columnName = getSqlName(expr);
+            return Arrays.asList(columnName);
+        }
+    }
+
     private String getSqlName(SQLExpr sqlName) {
         if (sqlName == null) {
             return null;

+ 13 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/FastsqlSchemaTest.java

@@ -327,4 +327,17 @@ public class FastsqlSchemaTest {
         System.out.println(data.toString());
     }
 
+
+    @Test
+    public void test_function_index () throws Throwable {
+        SchemaRepository repository = new SchemaRepository(JdbcConstants.MYSQL);
+        repository.setDefaultSchema("test");
+        String sql = "CREATE TABLE test1 (\n" + "    id INT AUTO_INCREMENT PRIMARY KEY,\n"
+                     + "    owner_id INT NOT NULL,\n" + "    code VARCHAR(100) NOT NULL,\n"
+                     + "    UNIQUE KEY uk_owner_id_upper_code (owner_id, (upper(code)))\n" + ");";
+        repository.console(sql);
+        SchemaObject table = repository.findTable("test1");
+        Assert.assertTrue(table != null);
+    }
+
 }

+ 25 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta_DDL_Test.java

@@ -6,6 +6,9 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.alibaba.druid.sql.repository.SchemaObject;
+import com.alibaba.druid.sql.repository.SchemaRepository;
+import com.alibaba.druid.util.JdbcConstants;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.junit.Assert;
@@ -109,4 +112,26 @@ public class MemoryTableMeta_DDL_Test {
             System.out.println(sourceMeta.toString());
         }
     }
+
+    @Test
+    public void test_function_index () throws Throwable {
+        MemoryTableMeta memoryTableMeta = new MemoryTableMeta();
+        URL url = Thread.currentThread().getContextClassLoader().getResource("dummy.txt");
+        File dummyFile = new File(url.getFile());
+        File create = new File(dummyFile.getParent() + "/ddl", "ddl_create_function_index.sql");
+        String sql = StringUtils.join(IOUtils.readLines(new FileInputStream(create)), "\n");
+        memoryTableMeta.apply(null, "test", sql, null);
+
+        List<String> tableNames = new ArrayList<>();
+        for (Schema schema : memoryTableMeta.getRepository().getSchemas()) {
+            tableNames.addAll(schema.showTables());
+        }
+
+        for (String table : tableNames) {
+            TableMeta sourceMeta = memoryTableMeta.find("test", table);
+            TableMeta.FieldMeta field = sourceMeta.getFieldMetaByName("code");
+            System.out.println(sourceMeta.toString());
+            Assert.assertTrue(field.isUnique());
+        }
+    }
 }

+ 1 - 0
parse/src/test/resources/ddl/ddl_create_function_index.sql

@@ -0,0 +1 @@
+CREATE TABLE function_index_test (
   id INT AUTO_INCREMENT PRIMARY KEY,
   owner_id INT NOT NULL,
   code VARCHAR(100) NOT NULL,
   UNIQUE KEY uk_owner_id_upper_code (owner_id, (upper(code)))
)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


CREATE TABLE function_index_test2 (
     id INT AUTO_INCREMENT PRIMARY KEY,
     owner_id INT NOT NULL,
     code VARCHAR(100) NOT NULL,
     UNIQUE KEY uk_owner_id_upper_code (owner_id, ((LEFT(UPPER(code), 10))))
)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE function_index_test3 (
      id INT AUTO_INCREMENT PRIMARY KEY,
      owner_id INT NOT NULL,
      code VARCHAR(100) NOT NULL,
      UNIQUE KEY uk_owner_id_upper_code (owner_id , code(10))
)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


CREATE TABLE function_index_test3 (
      id INT AUTO_INCREMENT PRIMARY KEY,
      owner_id INT NOT NULL,
      code VARCHAR(100) NOT NULL,
      UNIQUE KEY uk_owner_id_upper_code (code(10))
)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;