|
@@ -6,7 +6,6 @@ import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
-import com.alibaba.otter.canal.client.ConsumerBatchMessage;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
|
|
import org.apache.rocketmq.acl.common.SessionCredentials;
|
|
@@ -25,6 +24,7 @@ import org.slf4j.LoggerFactory;
|
|
|
import com.alibaba.fastjson2.JSON;
|
|
|
import com.alibaba.otter.canal.client.CanalMQConnector;
|
|
|
import com.alibaba.otter.canal.client.CanalMessageDeserializer;
|
|
|
+import com.alibaba.otter.canal.client.ConsumerBatchMessage;
|
|
|
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
|
|
|
import com.alibaba.otter.canal.protocol.FlatMessage;
|
|
|
import com.alibaba.otter.canal.protocol.Message;
|
|
@@ -155,11 +155,10 @@ public class RocketMQCanalConnector implements CanalMQConnector {
|
|
|
}
|
|
|
});
|
|
|
rocketMQConsumer.start();
|
|
|
+ connected = true;
|
|
|
} catch (MQClientException ex) {
|
|
|
- connected = false;
|
|
|
- logger.error("Start RocketMQ consumer error", ex);
|
|
|
+ throw new RuntimeException("Start RocketMQ consumer error", ex);
|
|
|
}
|
|
|
- connected = true;
|
|
|
}
|
|
|
|
|
|
private boolean process(List<MessageExt> messageExts) {
|