|
@@ -121,7 +121,7 @@ public class CanalKafkaStarter {
|
|
Message message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize()); // 获取指定数量的数据
|
|
Message message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize()); // 获取指定数量的数据
|
|
long batchId = message.getId();
|
|
long batchId = message.getId();
|
|
try {
|
|
try {
|
|
- int size = message.getEntries().size();
|
|
|
|
|
|
+ int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
|
|
if (batchId != -1 && size != 0) {
|
|
if (batchId != -1 && size != 0) {
|
|
if (!StringUtils.isEmpty(destination.getTopic())) {
|
|
if (!StringUtils.isEmpty(destination.getTopic())) {
|
|
Topic topic = new Topic();
|
|
Topic topic = new Topic();
|