|
@@ -82,13 +82,18 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
try {
|
|
|
connect = factory.newConnection();
|
|
|
channel = connect.createChannel();
|
|
|
- channel.queueDeclare(rabbitMQProperties.getQueue(), true, false, false, null);
|
|
|
- channel.exchangeDeclare(rabbitMQProperties
|
|
|
- .getExchange(), rabbitMQProperties.getDeliveryMode(), true, false, false, null);
|
|
|
- channel.queueBind(rabbitMQProperties.getQueue(),
|
|
|
- rabbitMQProperties.getExchange(),
|
|
|
- rabbitMQProperties.getRoutingKey());
|
|
|
-
|
|
|
+ String queue = rabbitMQProperties.getQueue();
|
|
|
+ String exchange = rabbitMQProperties.getExchange();
|
|
|
+ String deliveryMode = rabbitMQProperties.getDeliveryMode();
|
|
|
+ String routingKey = rabbitMQProperties.getRoutingKey();
|
|
|
+ if (!StringUtils.isEmpty(queue)) {
|
|
|
+ channel.queueDeclare(queue, true, false, false, null);
|
|
|
+ }
|
|
|
+ if (!StringUtils.isEmpty(queue) && !StringUtils.isEmpty(exchange) && !StringUtils.isEmpty(deliveryMode)
|
|
|
+ && !StringUtils.isEmpty(routingKey)) {
|
|
|
+ channel.exchangeDeclare(exchange, deliveryMode, true, false, false, null);
|
|
|
+ channel.queueBind(queue, exchange, routingKey);
|
|
|
+ }
|
|
|
} catch (IOException | TimeoutException ex) {
|
|
|
throw new CanalException("Start RabbitMQ producer error", ex);
|
|
|
}
|