Browse Source

支持rocketmq的消息按照tag去发送 (#3438)

* 支持rocketmq的消息按照tag去发送
并对应修改了配置文件

* 将tag的设计转移到Rocketmq的单独配置项
renyansongno1 4 years ago
parent
commit
a7720b51e4

+ 1 - 0
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/config/RocketMQConstants.java

@@ -17,6 +17,7 @@ public class RocketMQConstants {
     public static final String ROCKETMQ_NAMESRV_ADDR                 = ROOT + "." + "namesrv.addr";
     public static final String ROCKETMQ_RETRY_TIMES_WHEN_SEND_FAILED = ROOT + "." + "retry.times.when.send.failed";
     public static final String ROCKETMQ_VIP_CHANNEL_ENABLED          = ROOT + "." + "vip.channel.enabled";
+    public static final String ROCKETMQ_TAG                          = ROOT + "." + "tag";
 
     public static final String ROCKETMQ_ACCESS_CHANNEL               = ROOT + "." + "access.channel";
     public static final String ROCKETMQ_BATCH_SIZE                   = ROOT + "." + "batch.size";

+ 9 - 0
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/config/RocketMQProducerConfig.java

@@ -17,6 +17,7 @@ public class RocketMQProducerConfig extends MQProperties {
     private String  namesrvAddr;
     private int     retryTimesWhenSendFailed = 0;
     private boolean vipChannelEnabled        = false;
+    private String  tag;
 
     public String getProducerGroup() {
         return producerGroup;
@@ -73,4 +74,12 @@ public class RocketMQProducerConfig extends MQProperties {
     public void setVipChannelEnabled(boolean vipChannelEnabled) {
         this.vipChannelEnabled = vipChannelEnabled;
     }
+
+    public String getTag() {
+        return tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
 }

+ 8 - 4
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java

@@ -140,6 +140,10 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
         if (!StringUtils.isEmpty(vipChannelEnabled)) {
             rocketMQProperties.setVipChannelEnabled(Boolean.parseBoolean(vipChannelEnabled));
         }
+        String tag = properties.getProperty(RocketMQConstants.ROCKETMQ_TAG);
+        if (!StringUtils.isEmpty(tag)) {
+            rocketMQProperties.setTag(tag);
+        }
     }
 
     @Override
@@ -203,7 +207,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                     if (dataPartition != null) {
                         final int index = i;
                         template.submit(() -> {
-                            Message data = new Message(topicName, CanalMessageSerializerUtil.serializer(dataPartition,
+                            Message data = new Message(topicName, ((RocketMQProducerConfig)this.mqProperties).getTag(), CanalMessageSerializerUtil.serializer(dataPartition,
                                 mqProperties.isFilterTransactionEntry()));
                             sendMessage(data, index);
                         });
@@ -213,7 +217,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                 template.waitForResult();
             } else {
                 final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
-                Message data = new Message(topicName, CanalMessageSerializerUtil.serializer(message,
+                Message data = new Message(topicName, ((RocketMQProducerConfig)this.mqProperties).getTag(), CanalMessageSerializerUtil.serializer(message,
                     mqProperties.isFilterTransactionEntry()));
                 sendMessage(data, partition);
             }
@@ -250,7 +254,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                         final int index = i;
                         template.submit(() -> {
                             List<Message> messages = flatMessagePart.stream()
-                                .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage,
+                                .map(flatMessage -> new Message(topicName, ((RocketMQProducerConfig)this.mqProperties).getTag(), JSON.toJSONBytes(flatMessage,
                                     SerializerFeature.WriteMapNullValue)))
                                 .collect(Collectors.toList());
                             // 批量发送
@@ -264,7 +268,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
             } else {
                 final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                 List<Message> messages = flatMessages.stream()
-                    .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage,
+                    .map(flatMessage -> new Message(topicName, ((RocketMQProducerConfig)this.mqProperties).getTag(), JSON.toJSONBytes(flatMessage,
                         SerializerFeature.WriteMapNullValue)))
                     .collect(Collectors.toList());
                 // 批量发送

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -157,6 +157,7 @@ rocketmq.namespace =
 rocketmq.namesrv.addr = 127.0.0.1:9876
 rocketmq.retry.times.when.send.failed = 0
 rocketmq.vip.channel.enabled = false
+rocketmq.tag = testTag
 
 ##################################################
 ######### 		    RabbitMQ	     #############