|
@@ -1,5 +1,7 @@
|
|
|
package com.alibaba.otter.canal.client.rocketmq;
|
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.otter.canal.protocol.FlatMessage;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
@@ -24,21 +26,23 @@ import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
|
|
|
|
|
public class RocketMQCanalConnector implements CanalConnector {
|
|
|
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(RocketMQCanalConnector.class);
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(RocketMQCanalConnector.class);
|
|
|
|
|
|
- private String nameServer;
|
|
|
- private String topic;
|
|
|
- private String groupName;
|
|
|
- private volatile boolean connected = false;
|
|
|
- private DefaultMQPushConsumer rocketMQConsumer;
|
|
|
- private BlockingQueue<ConsumerBatchMessage<Message>> messageBlockingQueue;
|
|
|
- Map<Long, ConsumerBatchMessage<Message>> messageCache;
|
|
|
- private long batchProcessTimeout = 10000;
|
|
|
+ private String nameServer;
|
|
|
+ private String topic;
|
|
|
+ private String groupName;
|
|
|
+ private volatile boolean connected = false;
|
|
|
+ private DefaultMQPushConsumer rocketMQConsumer;
|
|
|
+ private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
|
|
|
+ Map<Long, ConsumerBatchMessage> messageCache;
|
|
|
+ private long batchProcessTimeout = 3000;
|
|
|
+ private boolean flatMessage;
|
|
|
|
|
|
- public RocketMQCanalConnector(String nameServer, String topic, String groupName){
|
|
|
+ public RocketMQCanalConnector(String nameServer, String topic, String groupName, boolean flatMessage) {
|
|
|
this.nameServer = nameServer;
|
|
|
this.topic = topic;
|
|
|
this.groupName = groupName;
|
|
|
+ this.flatMessage = flatMessage;
|
|
|
messageBlockingQueue = new LinkedBlockingQueue<>();
|
|
|
messageCache = new ConcurrentHashMap<>();
|
|
|
}
|
|
@@ -70,11 +74,12 @@ public class RocketMQCanalConnector implements CanalConnector {
|
|
|
if (rocketMQConsumer == null) {
|
|
|
this.connect();
|
|
|
}
|
|
|
- rocketMQConsumer.subscribe(topic, "*");
|
|
|
+ rocketMQConsumer.subscribe(this.topic, "*");
|
|
|
rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
|
|
|
|
|
|
@Override
|
|
|
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {
|
|
|
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts,
|
|
|
+ ConsumeOrderlyContext context) {
|
|
|
context.setAutoCommit(true);
|
|
|
boolean isSuccess = process(messageExts);
|
|
|
if (isSuccess) {
|
|
@@ -93,17 +98,33 @@ public class RocketMQCanalConnector implements CanalConnector {
|
|
|
}
|
|
|
|
|
|
private boolean process(List<MessageExt> messageExts) {
|
|
|
- BlockingQueue<Message> messageList = new LinkedBlockingQueue<>();
|
|
|
+ logger.info("Get Message:{}", messageExts);
|
|
|
+ BlockingQueue messageList = new LinkedBlockingQueue<>();
|
|
|
for (MessageExt messageExt : messageExts) {
|
|
|
byte[] data = messageExt.getBody();
|
|
|
- Message message = CanalMessageDeserializer.deserializer(data);
|
|
|
- try {
|
|
|
- messageList.put(message);
|
|
|
- } catch (InterruptedException ex) {
|
|
|
- logger.error("Add message error");
|
|
|
+ if (data != null){
|
|
|
+ try {
|
|
|
+ if (!flatMessage) {
|
|
|
+ Message message = CanalMessageDeserializer.deserializer(data);
|
|
|
+ messageList.put(message);
|
|
|
+ } else {
|
|
|
+ FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
|
|
|
+ messageList.put(flatMessage);
|
|
|
+ }
|
|
|
+ } catch (Exception ex) {
|
|
|
+ logger.error("Add message error", ex);
|
|
|
+ throw new CanalClientException(ex);
|
|
|
+ }
|
|
|
+ }else{
|
|
|
+ logger.warn("Received message data is null");
|
|
|
}
|
|
|
}
|
|
|
- ConsumerBatchMessage<Message> batchMessage = new ConsumerBatchMessage<>(messageList);
|
|
|
+ ConsumerBatchMessage batchMessage;
|
|
|
+ if (!flatMessage) {
|
|
|
+ batchMessage = new ConsumerBatchMessage<Message>(messageList);
|
|
|
+ } else {
|
|
|
+ batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);
|
|
|
+ }
|
|
|
try {
|
|
|
messageBlockingQueue.put(batchMessage);
|
|
|
} catch (InterruptedException e) {
|
|
@@ -131,17 +152,28 @@ public class RocketMQCanalConnector implements CanalConnector {
|
|
|
this.rocketMQConsumer.unsubscribe(this.topic);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 暂时不支持batchSize 参数
|
|
|
+ *
|
|
|
+ * @param batchSize 暂时不支持
|
|
|
+ * @return
|
|
|
+ * @throws CanalClientException
|
|
|
+ */
|
|
|
@Override
|
|
|
public Message get(int batchSize) throws CanalClientException {
|
|
|
Message message = getWithoutAck(batchSize);
|
|
|
- ack(message.getId());
|
|
|
+ if (message != null) {
|
|
|
+ ack(message.getId());
|
|
|
+ }
|
|
|
return message;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
|
|
|
Message message = getWithoutAck(batchSize, timeout, unit);
|
|
|
- ack(message.getId());
|
|
|
+ if (message != null) {
|
|
|
+ ack(message.getId());
|
|
|
+ }
|
|
|
return message;
|
|
|
}
|
|
|
|
|
@@ -155,6 +187,23 @@ public class RocketMQCanalConnector implements CanalConnector {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ private FlatMessage getFlatMessage(ConsumerBatchMessage consumerBatchMessage) {
|
|
|
+ BlockingQueue<FlatMessage> messageList = consumerBatchMessage.getData();
|
|
|
+ if (messageList != null & messageList.size() > 0) {
|
|
|
+ FlatMessage message = messageList.poll();
|
|
|
+ messageCache.put(message.getId(), consumerBatchMessage);
|
|
|
+ return message;
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 暂时不支持该参数设置
|
|
|
+ *
|
|
|
+ * @param batchSize
|
|
|
+ * @return
|
|
|
+ * @throws CanalClientException
|
|
|
+ */
|
|
|
@Override
|
|
|
public Message getWithoutAck(int batchSize) throws CanalClientException {
|
|
|
ConsumerBatchMessage batchMessage = messageBlockingQueue.poll();
|
|
@@ -168,11 +217,51 @@ public class RocketMQCanalConnector implements CanalConnector {
|
|
|
public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
|
|
|
try {
|
|
|
ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
|
|
|
- return getMessage(batchMessage);
|
|
|
+ if (batchMessage != null) {
|
|
|
+ return getMessage(batchMessage);
|
|
|
+ }
|
|
|
} catch (InterruptedException ex) {
|
|
|
logger.warn("Get message timeout", ex);
|
|
|
throw new CanalClientException("Failed to fetch the data after: " + timeout);
|
|
|
}
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public FlatMessage getFlatMessageWithoutAck() {
|
|
|
+ return getFlatMessageWithoutAck(null, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ public FlatMessage getFlatMessageWithoutAck(Long timeout,
|
|
|
+ TimeUnit unit) throws CanalClientException {
|
|
|
+ try {
|
|
|
+ ConsumerBatchMessage batchMessage = null;
|
|
|
+ if (timeout == null || timeout == 0) {
|
|
|
+ batchMessage = messageBlockingQueue.poll();
|
|
|
+ } else {
|
|
|
+ batchMessage = messageBlockingQueue.poll(timeout, unit);
|
|
|
+ }
|
|
|
+ if (batchMessage != null) {
|
|
|
+ return getFlatMessage(batchMessage);
|
|
|
+ }
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ logger.warn("Get flat message timeout", ex);
|
|
|
+ throw new CanalClientException("Failed to fetch the flat message data after: " + timeout);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public FlatMessage getFlatMessage() throws CanalClientException {
|
|
|
+ FlatMessage message = getFlatMessageWithoutAck(null, null);
|
|
|
+ if (message != null) {
|
|
|
+ ack(message.getId());
|
|
|
+ }
|
|
|
+ return message;
|
|
|
+ }
|
|
|
+
|
|
|
+ public FlatMessage getFlatMessage(Long timeout, TimeUnit unit) throws CanalClientException {
|
|
|
+ FlatMessage message = getFlatMessageWithoutAck(timeout, unit);
|
|
|
+ ack(message.getId());
|
|
|
+ return message;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -180,6 +269,7 @@ public class RocketMQCanalConnector implements CanalConnector {
|
|
|
ConsumerBatchMessage batchMessage = messageCache.get(batchId);
|
|
|
if (batchMessage != null) {
|
|
|
batchMessage.ack();
|
|
|
+ messageCache.remove(batchId);
|
|
|
}
|
|
|
}
|
|
|
|