|
@@ -6,7 +6,6 @@ import java.util.Map;
|
|
|
import java.util.Properties;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
-import com.alibaba.otter.canal.common.utils.PropertiesUtils;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -15,6 +14,7 @@ import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.serializer.SerializerFeature;
|
|
|
import com.alibaba.otter.canal.common.CanalException;
|
|
|
import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
|
|
|
+import com.alibaba.otter.canal.common.utils.PropertiesUtils;
|
|
|
import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
|
|
|
import com.alibaba.otter.canal.connector.core.producer.MQDestination;
|
|
|
import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils;
|
|
@@ -26,6 +26,7 @@ import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQConstants;
|
|
|
import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQProducerConfig;
|
|
|
import com.alibaba.otter.canal.protocol.FlatMessage;
|
|
|
import com.alibaba.otter.canal.protocol.Message;
|
|
|
+import com.rabbitmq.client.AlreadyClosedException;
|
|
|
import com.rabbitmq.client.Channel;
|
|
|
import com.rabbitmq.client.Connection;
|
|
|
import com.rabbitmq.client.ConnectionFactory;
|
|
@@ -177,6 +178,8 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
this.connect.close();
|
|
|
this.channel.close();
|
|
|
super.stop();
|
|
|
+ } catch (AlreadyClosedException ex) {
|
|
|
+ logger.error("Connection is already closed", ex);
|
|
|
} catch (IOException | TimeoutException ex) {
|
|
|
throw new CanalException("Stop RabbitMQ producer error", ex);
|
|
|
}
|