jianghang.loujh 9 mesiacov pred
rodič
commit
3439c48948

+ 31 - 28
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AdapterProcessor.java

@@ -1,5 +1,16 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
 import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -12,16 +23,6 @@ import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
 import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
 import com.alibaba.otter.canal.connector.core.spi.ExtensionLoader;
 import com.alibaba.otter.canal.connector.core.spi.ProxyCanalMsgConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /**
  * 适配处理器
@@ -31,27 +32,27 @@ import java.util.concurrent.TimeoutException;
  */
 public class AdapterProcessor {
 
-    private static final Logger logger = LoggerFactory.getLogger(AdapterProcessor.class);
+    private static final Logger             logger                    = LoggerFactory.getLogger(AdapterProcessor.class);
 
-    private static final String CONNECTOR_SPI_DIR = "/plugin";
-    private static final String CONNECTOR_STANDBY_SPI_DIR = "/canal-adapter/plugin";
+    private static final String             CONNECTOR_SPI_DIR         = "/plugin";
+    private static final String             CONNECTOR_STANDBY_SPI_DIR = "/canal-adapter/plugin";
 
-    private CanalMsgConsumer canalMsgConsumer;
+    private CanalMsgConsumer                canalMsgConsumer;
 
-    private String canalDestination;                                                           // canal实例
-    private String groupId = null;                                           // groupId
-    private List<List<OuterAdapter>> canalOuterAdapters;                                                         // 外部适配器
-    private CanalClientConfig canalClientConfig;                                                          // 配置
-    private ExecutorService groupInnerExecutorService;                                                  // 组内工作线程池
-    private volatile boolean running = false;                                          // 是否运行中
-    private Thread thread = null;
-    private Thread.UncaughtExceptionHandler handler = (t, e) -> logger
+    private String                          canalDestination;                                                           // canal实例
+    private String                          groupId                   = null;                                           // groupId
+    private List<List<OuterAdapter>>        canalOuterAdapters;                                                         // 外部适配器
+    private CanalClientConfig               canalClientConfig;                                                          // 配置
+    private ExecutorService                 groupInnerExecutorService;                                                  // 组内工作线程池
+    private volatile boolean                running                   = false;                                          // 是否运行中
+    private Thread                          thread                    = null;
+    private Thread.UncaughtExceptionHandler handler                   = (t, e) -> logger
         .error("parse events has an error", e);
 
-    private SyncSwitch syncSwitch;
+    private SyncSwitch                      syncSwitch;
 
     public AdapterProcessor(CanalClientConfig canalClientConfig, String destination, String groupId,
-        List<List<OuterAdapter>> canalOuterAdapters) {
+                            List<List<OuterAdapter>> canalOuterAdapters){
         this.canalClientConfig = canalClientConfig;
         this.canalDestination = destination;
         this.groupId = groupId;
@@ -62,10 +63,12 @@ public class AdapterProcessor {
 
         // load connector consumer
         ExtensionLoader<CanalMsgConsumer> loader = new ExtensionLoader<>(CanalMsgConsumer.class);
+        // see https://github.com/alibaba/canal/pull/5175
         String key = destination + "_" + groupId;
-        canalMsgConsumer = new ProxyCanalMsgConsumer(loader
-            .getExtension(canalClientConfig.getMode().toLowerCase(), key, CONNECTOR_SPI_DIR,
-                CONNECTOR_STANDBY_SPI_DIR));
+        canalMsgConsumer = new ProxyCanalMsgConsumer(loader.getExtension(canalClientConfig.getMode().toLowerCase(),
+            key,
+            CONNECTOR_SPI_DIR,
+            CONNECTOR_STANDBY_SPI_DIR));
 
         Properties properties = canalClientConfig.getConsumerProperties();
         properties.put(CanalConstants.CANAL_MQ_FLAT_MESSAGE, canalClientConfig.getFlatMessage());
@@ -170,7 +173,7 @@ public class AdapterProcessor {
         }
 
         int retry = canalClientConfig.getRetries() == null
-            || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         if (retry == -1) {
             // 重试次数-1代表异常时一直阻塞重试
             retry = Integer.MAX_VALUE;

+ 76 - 70
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java

@@ -1,5 +1,29 @@
 package com.alibaba.otter.canal.connector.rocketmq.producer;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.AccessChannel;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONWriter;
 import com.alibaba.otter.canal.common.CanalException;
@@ -16,29 +40,6 @@ import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
 import com.alibaba.otter.canal.connector.rocketmq.config.RocketMQConstants;
 import com.alibaba.otter.canal.connector.rocketmq.config.RocketMQProducerConfig;
 import com.alibaba.otter.canal.protocol.FlatMessage;
-import org.apache.commons.lang.StringUtils;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
-import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 /**
  * RocketMQ Producer SPI 实现
@@ -46,16 +47,18 @@ import java.util.stream.Collectors;
  * @author rewerma 2020-01-27
  * @version 1.0.0
  */
-@SPI("rocketmq") public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQProducer {
+@SPI("rocketmq")
+public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQProducer {
 
-    private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
+    private static final Logger  logger               = LoggerFactory.getLogger(CanalRocketMQProducer.class);
 
-    private              DefaultMQProducer  defaultMQProducer;
-    private static final String             CLOUD_ACCESS_CHANNEL = "cloud";
-    private static final String             NAMESPACE_SEPARATOR  = "%";
-    protected            ThreadPoolExecutor sendPartitionExecutor;
+    private DefaultMQProducer    defaultMQProducer;
+    private static final String  CLOUD_ACCESS_CHANNEL = "cloud";
+    private static final String  NAMESPACE_SEPARATOR  = "%";
+    protected ThreadPoolExecutor sendPartitionExecutor;
 
-    @Override public void init(Properties properties) {
+    @Override
+    public void init(Properties properties) {
         RocketMQProducerConfig rocketMQProperties = new RocketMQProducerConfig();
         this.mqProperties = rocketMQProperties;
         super.init(properties);
@@ -70,9 +73,9 @@ import java.util.stream.Collectors;
         }
 
         defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(),
-                rpcHook,
-                rocketMQProperties.isEnableMessageTrace(),
-                rocketMQProperties.getCustomizedTraceTopic());
+            rpcHook,
+            rocketMQProperties.isEnableMessageTrace(),
+            rocketMQProperties.getCustomizedTraceTopic());
         if (CLOUD_ACCESS_CHANNEL.equals(rocketMQProperties.getAccessChannel())) {
             defaultMQProducer.setAccessChannel(AccessChannel.CLOUD);
         }
@@ -91,12 +94,12 @@ import java.util.stream.Collectors;
 
         int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
         sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
-                parallelPartitionSendThreadSize,
-                0,
-                TimeUnit.SECONDS,
-                new ArrayBlockingQueue<>(parallelPartitionSendThreadSize * 2),
-                new NamedThreadFactory("MQ-Parallel-Sender-Partition"),
-                new ThreadPoolExecutor.CallerRunsPolicy());
+            parallelPartitionSendThreadSize,
+            0,
+            TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(parallelPartitionSendThreadSize * 2),
+            new NamedThreadFactory("MQ-Parallel-Sender-Partition"),
+            new ThreadPoolExecutor.CallerRunsPolicy());
     }
 
     private void loadRocketMQProperties(Properties properties) {
@@ -112,12 +115,12 @@ import java.util.stream.Collectors;
             rocketMQProperties.setProducerGroup(producerGroup);
         }
         String enableMessageTrace = PropertiesUtils.getProperty(properties,
-                RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
+            RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
         if (!StringUtils.isEmpty(enableMessageTrace)) {
             rocketMQProperties.setEnableMessageTrace(Boolean.parseBoolean(enableMessageTrace));
         }
         String customizedTraceTopic = PropertiesUtils.getProperty(properties,
-                RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
+            RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
         if (!StringUtils.isEmpty(customizedTraceTopic)) {
             rocketMQProperties.setCustomizedTraceTopic(customizedTraceTopic);
         }
@@ -134,7 +137,7 @@ import java.util.stream.Collectors;
             rocketMQProperties.setRetryTimesWhenSendFailed(Integer.parseInt(retry));
         }
         String vipChannelEnabled = PropertiesUtils.getProperty(properties,
-                RocketMQConstants.ROCKETMQ_VIP_CHANNEL_ENABLED);
+            RocketMQConstants.ROCKETMQ_VIP_CHANNEL_ENABLED);
         if (!StringUtils.isEmpty(vipChannelEnabled)) {
             rocketMQProperties.setVipChannelEnabled(Boolean.parseBoolean(vipChannelEnabled));
         }
@@ -144,15 +147,14 @@ import java.util.stream.Collectors;
         }
     }
 
-    @Override public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Message message,
-                               Callback callback) {
+    @Override
+    public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Message message, Callback callback) {
         ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
         try {
             if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
                 // 动态topic
-                Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils.messageTopics(message,
-                        destination.getTopic(),
-                        destination.getDynamicTopic());
+                Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
+                    .messageTopics(message, destination.getTopic(), destination.getDynamicTopic());
 
                 for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
                     String topicName = entry.getKey().replace('.', '_');
@@ -184,7 +186,7 @@ import java.util.stream.Collectors;
                      com.alibaba.otter.canal.protocol.Message message) {
         // 获取当前topic的分区数
         Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
-                destination.getDynamicTopicPartitionNum());
+            destination.getDynamicTopicPartitionNum());
 
         // 获取topic的队列数为分区数
         if (partitionNum == null) {
@@ -200,10 +202,10 @@ import java.util.stream.Collectors;
                 MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
                 // 串行分区
                 com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
-                        message.getId(),
-                        partitionNum,
-                        destination.getPartitionHash(),
-                        mqProperties.isDatabaseHash());
+                    message.getId(),
+                    partitionNum,
+                    destination.getPartitionHash(),
+                    mqProperties.isDatabaseHash());
                 int length = messages.length;
 
                 ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
@@ -213,9 +215,9 @@ import java.util.stream.Collectors;
                         final int index = i;
                         template.submit(() -> {
                             Message data = new Message(topicName,
-                                    ((RocketMQProducerConfig) this.mqProperties).getTag(),
-                                    CanalMessageSerializerUtil.serializer(dataPartition,
-                                            mqProperties.isFilterTransactionEntry()));
+                                ((RocketMQProducerConfig) this.mqProperties).getTag(),
+                                CanalMessageSerializerUtil.serializer(dataPartition,
+                                    mqProperties.isFilterTransactionEntry()));
                             sendMessage(data, index);
                         });
                     }
@@ -225,8 +227,8 @@ import java.util.stream.Collectors;
             } else {
                 final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                 Message data = new Message(topicName,
-                        ((RocketMQProducerConfig) this.mqProperties).getTag(),
-                        CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry()));
+                    ((RocketMQProducerConfig) this.mqProperties).getTag(),
+                    CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry()));
                 sendMessage(data, partition);
             }
         } else {
@@ -243,9 +245,9 @@ import java.util.stream.Collectors;
 
                 for (FlatMessage flatMessage : flatMessages) {
                     FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
-                            partitionNum,
-                            destination.getPartitionHash(),
-                            mqProperties.isDatabaseHash());
+                        partitionNum,
+                        destination.getPartitionHash(),
+                        mqProperties.isDatabaseHash());
                     int length = partitionFlatMessage.length;
                     for (int i = 0; i < length; i++) {
                         // 增加null判断,issue #3267
@@ -262,10 +264,12 @@ import java.util.stream.Collectors;
                         final int index = i;
                         template.submit(() -> {
                             List<Message> messages = flatMessagePart.stream()
-                                    .map(flatMessage -> new Message(topicName,
-                                            ((RocketMQProducerConfig) this.mqProperties).getTag(),
-                                            JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject)))
-                                    .collect(Collectors.toList());
+                                .map(flatMessage -> new Message(topicName,
+                                    ((RocketMQProducerConfig) this.mqProperties).getTag(),
+                                    JSON.toJSONBytes(flatMessage,
+                                        JSONWriter.Feature.WriteNulls,
+                                        JSONWriter.Feature.LargeObject)))
+                                .collect(Collectors.toList());
                             // 批量发送
                             sendMessage(messages, index);
                         });
@@ -277,10 +281,10 @@ import java.util.stream.Collectors;
             } else {
                 final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                 List<Message> messages = flatMessages.stream()
-                        .map(flatMessage -> new Message(topicName,
-                                ((RocketMQProducerConfig) this.mqProperties).getTag(),
-                                JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls,JSONWriter.Feature.LargeObject)))
-                        .collect(Collectors.toList());
+                    .map(flatMessage -> new Message(topicName,
+                        ((RocketMQProducerConfig) this.mqProperties).getTag(),
+                        JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject)))
+                    .collect(Collectors.toList());
                 // 批量发送
                 sendMessage(messages, partition);
             }
@@ -305,7 +309,8 @@ import java.util.stream.Collectors;
         }
     }
 
-    @SuppressWarnings("deprecation") private void sendMessage(List<Message> messages, int partition) {
+    @SuppressWarnings("deprecation")
+    private void sendMessage(List<Message> messages, int partition) {
         if (messages.isEmpty()) {
             return;
         }
@@ -351,7 +356,8 @@ import java.util.stream.Collectors;
         }
     }
 
-    @Override public void stop() {
+    @Override
+    public void stop() {
         logger.info("## Stop RocketMQ producer##");
         this.defaultMQProducer.shutdown();
         if (sendPartitionExecutor != null) {