|
@@ -139,7 +139,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
|
|
|
private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
|
|
|
throws Exception {
|
|
|
if (!kafkaProperties.getFlatMessage()) {
|
|
|
- List<ProducerRecord<String, Message>> records = new ArrayList<>();
|
|
|
+ List<ProducerRecord<String, Message>> records = new ArrayList<ProducerRecord<String, Message>>();
|
|
|
if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
|
|
|
Message[] messages = MQMessageUtils.messagePartition(message,
|
|
|
canalDestination.getPartitionsNum(),
|
|
@@ -148,23 +148,23 @@ public class CanalKafkaProducer implements CanalMQProducer {
|
|
|
for (int i = 0; i < length; i++) {
|
|
|
Message messagePartition = messages[i];
|
|
|
if (messagePartition != null) {
|
|
|
- records.add(new ProducerRecord<>(topicName, i, null, messagePartition));
|
|
|
+ records.add(new ProducerRecord<String, Message>(topicName, i, null, messagePartition));
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
|
|
|
- records.add(new ProducerRecord<>(topicName, partition, null, message));
|
|
|
+ records.add(new ProducerRecord<String, Message>(topicName, partition, null, message));
|
|
|
}
|
|
|
|
|
|
- if (!records.isEmpty()) {
|
|
|
- for (ProducerRecord<String, Message> record : records) {
|
|
|
- producer.send(record).get();
|
|
|
- }
|
|
|
-
|
|
|
- if (logger.isDebugEnabled()) {
|
|
|
- logger.debug("Send message to kafka topic: [{}], packet: {}", topicName, message.toString());
|
|
|
- }
|
|
|
- }
|
|
|
+ if (!records.isEmpty()) {
|
|
|
+ for (ProducerRecord<String, Message> record : records) {
|
|
|
+ producer.send(record).get();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug("Send message to kafka topic: [{}], packet: {}", topicName, message.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
} else {
|
|
|
// 发送扁平数据json
|
|
|
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
|