|
@@ -72,12 +72,23 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
try {
|
|
|
connect = factory.newConnection();
|
|
|
channel = connect.createChannel();
|
|
|
- channel.queueDeclare(rabbitMQProperties.getQueue(), true, false, false, null);
|
|
|
- channel.exchangeDeclare(rabbitMQProperties
|
|
|
- .getExchange(), rabbitMQProperties.getDeliveryMode(), true, false, false, null);
|
|
|
- channel.queueBind(rabbitMQProperties.getQueue(),
|
|
|
- rabbitMQProperties.getExchange(),
|
|
|
- rabbitMQProperties.getRoutingKey());
|
|
|
+ String queue=rabbitMQProperties.getQueue();
|
|
|
+ String exchange=rabbitMQProperties.getExchange();
|
|
|
+ String deliveryMode=rabbitMQProperties.getDeliveryMode();
|
|
|
+ String routingKey=rabbitMQProperties.getRoutingKey();
|
|
|
+ if(!StringUtils.isEmpty(queue))
|
|
|
+ {
|
|
|
+ channel.queueDeclare(queue, true, false, false, null);
|
|
|
+ }
|
|
|
+ if(!StringUtils.isEmpty(queue)
|
|
|
+ &&!StringUtils.isEmpty(exchange)
|
|
|
+ &&!StringUtils.isEmpty(deliveryMode)
|
|
|
+ &&!StringUtils.isEmpty(routingKey))
|
|
|
+ {
|
|
|
+ channel.exchangeDeclare(exchange, deliveryMode, true, false, false, null);
|
|
|
+ channel.queueBind(queue,exchange,routingKey);
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
} catch (IOException | TimeoutException ex) {
|
|
|
throw new CanalException("Start RabbitMQ producer error", ex);
|