|
@@ -25,97 +25,98 @@ import com.google.common.collect.Lists;
|
|
|
* 注意点:
|
|
|
* 1. 相比于canal {@linkplain SimpleCanalConnector}, 这里get和ack操作不能有并发, 必须是一个线程执行get后,内存里执行完毕ack后再取下一个get
|
|
|
* </pre>
|
|
|
+ *
|
|
|
* todo 重复消费的概率相当高。一次批处理中,只要有一个消息处理失败,则该批次全部重试
|
|
|
*
|
|
|
* @since 1.1.1
|
|
|
*/
|
|
|
public class PulsarMQCanalConnector implements CanalMQConnector {
|
|
|
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(PulsarMQCanalConnector.class);
|
|
|
+ private static final Logger logger = LoggerFactory
|
|
|
+ .getLogger(PulsarMQCanalConnector.class);
|
|
|
|
|
|
private volatile Messages<byte[]> lastGetBatchMessage;
|
|
|
|
|
|
/**
|
|
|
* 连接pulsar客户端
|
|
|
*/
|
|
|
- private PulsarClient pulsarClient;
|
|
|
+ private PulsarClient pulsarClient;
|
|
|
/**
|
|
|
* 消费者
|
|
|
*/
|
|
|
- private Consumer<byte[]> consumer;
|
|
|
+ private Consumer<byte[]> consumer;
|
|
|
/**
|
|
|
* 是否扁平化Canal消息内容
|
|
|
*/
|
|
|
- private boolean isFlatMessage = false;
|
|
|
+ private boolean isFlatMessage = false;
|
|
|
/**
|
|
|
* 主题名称
|
|
|
*/
|
|
|
- private String topic;
|
|
|
+ private String topic;
|
|
|
/**
|
|
|
* 环境连接URL
|
|
|
*/
|
|
|
- private String serviceUrl;
|
|
|
+ private String serviceUrl;
|
|
|
/**
|
|
|
* 角色认证token
|
|
|
*/
|
|
|
- private String roleToken;
|
|
|
+ private String roleToken;
|
|
|
|
|
|
/**
|
|
|
* listener name
|
|
|
*/
|
|
|
- private String listenerName;
|
|
|
+ private String listenerName;
|
|
|
|
|
|
/**
|
|
|
* 订阅客户端名称
|
|
|
*/
|
|
|
- private String subscriptName;
|
|
|
+ private String subscriptName;
|
|
|
/**
|
|
|
* 每次批量获取数据的最大条目数,默认30
|
|
|
*/
|
|
|
- private int batchSize = 30;
|
|
|
+ private int batchSize = 30;
|
|
|
/**
|
|
|
- * 与{@code batchSize}一起决定批量获取的数据大小
|
|
|
- * 当:
|
|
|
+ * 与{@code batchSize}一起决定批量获取的数据大小 当:
|
|
|
* <p>
|
|
|
* 1. {@code batchSize} 条消息未消费时<br/>
|
|
|
* 2. 距上一次批量消费时间达到{@code batchTimeoutSeconds}秒时
|
|
|
* </p>
|
|
|
* 任一条件满足,即执行批量消费
|
|
|
*/
|
|
|
- private int getBatchTimeoutSeconds = 30;
|
|
|
+ private int getBatchTimeoutSeconds = 30;
|
|
|
/**
|
|
|
* 批量处理消息时,一次批量处理的超时时间秒数
|
|
|
* <p>
|
|
|
* 该时间应该根据{@code batchSize}和{@code batchTimeoutSeconds}合理设置
|
|
|
* </p>
|
|
|
*/
|
|
|
- private int batchProcessTimeoutSeconds = 60;
|
|
|
+ private int batchProcessTimeoutSeconds = 60;
|
|
|
/**
|
|
|
* 消费失败后的重试秒数,默认60秒
|
|
|
*/
|
|
|
- private int redeliveryDelaySeconds = 60;
|
|
|
+ private int redeliveryDelaySeconds = 60;
|
|
|
/**
|
|
|
* 当客户端接收到消息,30秒还没有返回ack给服务端时,ack超时,会重新消费该消息
|
|
|
*/
|
|
|
- private int ackTimeoutSeconds = 30;
|
|
|
+ private int ackTimeoutSeconds = 30;
|
|
|
/**
|
|
|
* 是否开启消息失败重试功能,默认开启
|
|
|
*/
|
|
|
- private boolean isRetry = true;
|
|
|
+ private boolean isRetry = true;
|
|
|
/**
|
|
|
* <p>
|
|
|
* true重试(-RETRY)和死信队列(-DLQ)后缀为大写,有些地方创建的为小写,需确保正确
|
|
|
* </p>
|
|
|
*/
|
|
|
- private boolean isRetryDLQUpperCase = false;
|
|
|
+ private boolean isRetryDLQUpperCase = false;
|
|
|
/**
|
|
|
* 最大重试次数
|
|
|
*/
|
|
|
- private int maxRedeliveryCount = 128;
|
|
|
+ private int maxRedeliveryCount = 128;
|
|
|
/**
|
|
|
* 连接标识位,在连接或关闭连接后改变值
|
|
|
*/
|
|
|
- private boolean connected = false;
|
|
|
+ private boolean connected = false;
|
|
|
|
|
|
/**
|
|
|
* 除必要参数外,其他参数使用默认值
|
|
@@ -125,21 +126,21 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
|
|
|
* </p>
|
|
|
*
|
|
|
* @param isFlatMessage true使用扁平消息
|
|
|
- * @param serviceUrl pulsar服务连接地址,通常为:pulsar:host:ip或http://host:ip
|
|
|
- * @param roleToken 有对应topic的消费者权限的角色token
|
|
|
- * @param topic 订阅主题
|
|
|
+ * @param serviceUrl pulsar服务连接地址,通常为:pulsar:host:ip或http://host:ip
|
|
|
+ * @param roleToken 有对应topic的消费者权限的角色token
|
|
|
+ * @param topic 订阅主题
|
|
|
* @param subscriptName 订阅和客户端名称,同一个订阅名视为同一个消费实例
|
|
|
* @date 2021/9/18 08:54
|
|
|
* @author chad
|
|
|
* @since 1 by chad at 2021/9/18 完善
|
|
|
*/
|
|
|
- public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
|
|
|
- , String subscriptName) {
|
|
|
- this(isFlatMessage, serviceUrl, roleToken, topic, subscriptName,null);
|
|
|
+ public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic,
|
|
|
+ String subscriptName){
|
|
|
+ this(isFlatMessage, serviceUrl, roleToken, topic, subscriptName, null);
|
|
|
}
|
|
|
|
|
|
- public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
|
|
|
- , String subscriptName, String listenerName) {
|
|
|
+ public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic,
|
|
|
+ String subscriptName, String listenerName){
|
|
|
this.isFlatMessage = isFlatMessage;
|
|
|
this.serviceUrl = serviceUrl;
|
|
|
this.roleToken = roleToken;
|
|
@@ -158,19 +159,31 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
|
|
|
* @author chad
|
|
|
* @since 1 by chad at 2021/9/18 完善
|
|
|
*/
|
|
|
- public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
|
|
|
- , String subscriptName, int batchSize, int getBatchTimeoutSeconds, int batchProcessTimeoutSeconds
|
|
|
- , int redeliveryDelaySeconds, int ackTimeoutSeconds, boolean isRetry, boolean isRetryDLQUpperCase
|
|
|
- , int maxRedeliveryCount) {
|
|
|
- this(isFlatMessage, serviceUrl, roleToken, topic, subscriptName, batchSize, getBatchTimeoutSeconds
|
|
|
- , batchProcessTimeoutSeconds, redeliveryDelaySeconds, ackTimeoutSeconds, isRetry, isRetryDLQUpperCase
|
|
|
- , maxRedeliveryCount, null);
|
|
|
+ public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic,
|
|
|
+ String subscriptName, int batchSize, int getBatchTimeoutSeconds,
|
|
|
+ int batchProcessTimeoutSeconds, int redeliveryDelaySeconds, int ackTimeoutSeconds,
|
|
|
+ boolean isRetry, boolean isRetryDLQUpperCase, int maxRedeliveryCount){
|
|
|
+ this(isFlatMessage,
|
|
|
+ serviceUrl,
|
|
|
+ roleToken,
|
|
|
+ topic,
|
|
|
+ subscriptName,
|
|
|
+ batchSize,
|
|
|
+ getBatchTimeoutSeconds,
|
|
|
+ batchProcessTimeoutSeconds,
|
|
|
+ redeliveryDelaySeconds,
|
|
|
+ ackTimeoutSeconds,
|
|
|
+ isRetry,
|
|
|
+ isRetryDLQUpperCase,
|
|
|
+ maxRedeliveryCount,
|
|
|
+ null);
|
|
|
}
|
|
|
|
|
|
- public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
|
|
|
- , String subscriptName, int batchSize, int getBatchTimeoutSeconds, int batchProcessTimeoutSeconds
|
|
|
- , int redeliveryDelaySeconds, int ackTimeoutSeconds, boolean isRetry, boolean isRetryDLQUpperCase
|
|
|
- , int maxRedeliveryCount, String listenerName) {
|
|
|
+ public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic,
|
|
|
+ String subscriptName, int batchSize, int getBatchTimeoutSeconds,
|
|
|
+ int batchProcessTimeoutSeconds, int redeliveryDelaySeconds, int ackTimeoutSeconds,
|
|
|
+ boolean isRetry, boolean isRetryDLQUpperCase, int maxRedeliveryCount,
|
|
|
+ String listenerName){
|
|
|
this.isFlatMessage = isFlatMessage;
|
|
|
this.serviceUrl = serviceUrl;
|
|
|
this.roleToken = roleToken;
|
|
@@ -195,13 +208,12 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
|
|
|
// 连接创建客户端
|
|
|
try {
|
|
|
ClientBuilder builder = PulsarClient.builder()
|
|
|
- .serviceUrl(serviceUrl)
|
|
|
- .authentication(AuthenticationFactory.token(roleToken));
|
|
|
+ .serviceUrl(serviceUrl)
|
|
|
+ .authentication(AuthenticationFactory.token(roleToken));
|
|
|
if (StringUtils.isNotEmpty(listenerName)) {
|
|
|
builder.listenerName(listenerName);
|
|
|
}
|
|
|
- pulsarClient = builder
|
|
|
- .build();
|
|
|
+ pulsarClient = builder.build();
|
|
|
} catch (PulsarClientException e) {
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
@@ -250,15 +262,14 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
|
|
|
builder.subscriptionType(SubscriptionType.Failover);
|
|
|
|
|
|
builder
|
|
|
- // 调用consumer.negativeAcknowledge(message) (即nack)来表示消费失败的消息
|
|
|
- // 在指定的时间进行重新消费,默认是1分钟。
|
|
|
- .negativeAckRedeliveryDelay(this.redeliveryDelaySeconds, TimeUnit.SECONDS)
|
|
|
- .subscriptionName(this.subscriptName)
|
|
|
- ;
|
|
|
+ // 调用consumer.negativeAcknowledge(message) (即nack)来表示消费失败的消息
|
|
|
+ // 在指定的时间进行重新消费,默认是1分钟。
|
|
|
+ .negativeAckRedeliveryDelay(this.redeliveryDelaySeconds, TimeUnit.SECONDS)
|
|
|
+ .subscriptionName(this.subscriptName);
|
|
|
if (this.isRetry) {
|
|
|
DeadLetterPolicy.DeadLetterPolicyBuilder dlqBuilder = DeadLetterPolicy.builder()
|
|
|
- // 最大重试次数
|
|
|
- .maxRedeliverCount(this.maxRedeliveryCount);
|
|
|
+ // 最大重试次数
|
|
|
+ .maxRedeliverCount(this.maxRedeliveryCount);
|
|
|
// 指定重试队列,不是多个或通配符topic才能判断重试队列
|
|
|
if (!MQUtil.isPatternTag(this.topic)) {
|
|
|
String retryTopic = this.topic + (this.isRetryDLQUpperCase ? "-RETRY" : "-retry");
|
|
@@ -267,19 +278,17 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
|
|
|
dlqBuilder.deadLetterTopic(dlqTopic);
|
|
|
}
|
|
|
|
|
|
- //默认关闭,如果需要重试则开启
|
|
|
- builder.enableRetry(true)
|
|
|
- .deadLetterPolicy(dlqBuilder.build());
|
|
|
+ // 默认关闭,如果需要重试则开启
|
|
|
+ builder.enableRetry(true).deadLetterPolicy(dlqBuilder.build());
|
|
|
}
|
|
|
|
|
|
// ack超时
|
|
|
builder.ackTimeout(this.ackTimeoutSeconds, TimeUnit.SECONDS);
|
|
|
|
|
|
// pulsar批量获取消息设置
|
|
|
- builder.batchReceivePolicy(new BatchReceivePolicy.Builder()
|
|
|
- .maxNumMessages(this.batchSize)
|
|
|
- .timeout(this.getBatchTimeoutSeconds, TimeUnit.SECONDS)
|
|
|
- .build());
|
|
|
+ builder.batchReceivePolicy(new BatchReceivePolicy.Builder().maxNumMessages(this.batchSize)
|
|
|
+ .timeout(this.getBatchTimeoutSeconds, TimeUnit.SECONDS)
|
|
|
+ .build());
|
|
|
|
|
|
try {
|
|
|
this.consumer = builder.subscribe();
|
|
@@ -289,7 +298,6 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
@Override
|
|
|
public void subscribe() throws CanalClientException {
|
|
|
this.subscribe(null);
|
|
@@ -310,7 +318,7 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
|
|
|
* 不关注业务执行结果,只要收到消息即认识消费成功,自动ack
|
|
|
*
|
|
|
* @param timeout 阻塞获取消息的超时时间
|
|
|
- * @param unit 时间单位
|
|
|
+ * @param unit 时间单位
|
|
|
* @return java.util.List<com.alibaba.otter.canal.protocol.Message>
|
|
|
* @date 2021/9/13 22:24
|
|
|
* @author chad
|
|
@@ -326,13 +334,14 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 关心业务执行结果,业务侧根据执行结果调用 {@link PulsarMQCanalConnector#ack()}或{@link PulsarMQCanalConnector#rollback()}
|
|
|
+ * 关心业务执行结果,业务侧根据执行结果调用
|
|
|
+ * {@link PulsarMQCanalConnector#ack()}或{@link PulsarMQCanalConnector#rollback()}
|
|
|
* <p>
|
|
|
* 本方法示支持多线程,在MQ保障顺序的前提下,也无法提供单Topic多线程
|
|
|
* </p>
|
|
|
*
|
|
|
* @param timeout 阻塞获取消息的超时时间
|
|
|
- * @param unit 时间单位
|
|
|
+ * @param unit 时间单位
|
|
|
* @return java.util.List<com.alibaba.otter.canal.protocol.Message>
|
|
|
* @date 2021/9/13 22:26
|
|
|
* @author chad
|
|
@@ -366,7 +375,8 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
|
|
|
* @return java.util.List<T>
|
|
|
* @date 2021/9/14 15:20
|
|
|
* @author chad
|
|
|
- * @since 1 by chad at 2021/9/14 供{@link PulsarMQCanalConnector#getListWithoutAck(Long, TimeUnit)}
|
|
|
+ * @since 1 by chad at 2021/9/14
|
|
|
+ * 供{@link PulsarMQCanalConnector#getListWithoutAck(Long, TimeUnit)}
|
|
|
* 和{@link PulsarMQCanalConnector#getFlatListWithoutAck(Long, TimeUnit)}调用
|
|
|
*/
|
|
|
private <T> List<T> getListWithoutAck() {
|