Explorar o código

fixed compiler

agapple %!s(int64=6) %!d(string=hai) anos
pai
achega
97640b717b

+ 12 - 12
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -139,7 +139,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
     private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
                                                                                                         throws Exception {
         if (!kafkaProperties.getFlatMessage()) {
-        	List<ProducerRecord<String, Message>> records = new ArrayList<>();
+            List<ProducerRecord<String, Message>> records = new ArrayList<ProducerRecord<String, Message>>();
             if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                 Message[] messages = MQMessageUtils.messagePartition(message,
                     canalDestination.getPartitionsNum(),
@@ -148,23 +148,23 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 for (int i = 0; i < length; i++) {
                     Message messagePartition = messages[i];
                     if (messagePartition != null) {
-                    	records.add(new ProducerRecord<>(topicName, i, null, messagePartition));
+                        records.add(new ProducerRecord<String, Message>(topicName, i, null, messagePartition));
                     }
                 }
             } else {
                 final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
-                records.add(new ProducerRecord<>(topicName, partition, null, message));
+                records.add(new ProducerRecord<String, Message>(topicName, partition, null, message));
             }
 
-			if (!records.isEmpty()) {
-				for (ProducerRecord<String, Message> record : records) {
-					producer.send(record).get();
-				}
-				
-				if (logger.isDebugEnabled()) {
-					logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
-				}
-			}
+            if (!records.isEmpty()) {
+                for (ProducerRecord<String, Message> record : records) {
+                    producer.send(record).get();
+                }
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
+                }
+            }
         } else {
             // 发送扁平数据json
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);