|
@@ -91,9 +91,9 @@ public class CanalKafkaStarter {
|
|
|
private static void worker(Topic topic) {
|
|
|
while (!running) ;
|
|
|
while (!CanalServerStarter.isRunning()) ; //等待server启动完成
|
|
|
- logger.info("## start the canal consumer: {}.", topic.getDestination());
|
|
|
+ logger.info("## start the canal consumer: {}.", topic.getCanalDestination());
|
|
|
CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
|
|
|
- ClientIdentity clientIdentity = new ClientIdentity(topic.getDestination(), (short) 1001, "");
|
|
|
+ ClientIdentity clientIdentity = new ClientIdentity(topic.getCanalDestination(), (short) 1001, "");
|
|
|
while (running) {
|
|
|
try {
|
|
|
if (!server.getCanalInstances().containsKey(clientIdentity.getDestination())) {
|
|
@@ -105,7 +105,7 @@ public class CanalKafkaStarter {
|
|
|
continue;
|
|
|
}
|
|
|
server.subscribe(clientIdentity);
|
|
|
- logger.info("## the canal consumer {} is running now ......", topic.getDestination());
|
|
|
+ logger.info("## the canal consumer {} is running now ......", topic.getCanalDestination());
|
|
|
|
|
|
while (running) {
|
|
|
Message message = server.getWithoutAck(clientIdentity, 5 * 1024); // 获取指定数量的数据
|