Преглед на файлове

fix序列化的一些问题, kafka生产端统一使用异步

mcy преди 6 години
родител
ревизия
7005c11ac2

+ 7 - 8
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -180,8 +180,8 @@ public class SimpleCanalConnector implements CanalConnector {
 
 
             Ack ackBody = Ack.parseFrom(ack.getBody());
             Ack ackBody = Ack.parseFrom(ack.getBody());
             if (ackBody.getErrorCode() > 0) {
             if (ackBody.getErrorCode() > 0) {
-                throw new CanalClientException("something goes wrong when doing authentication: "
-                                               + ackBody.getErrorMessage());
+                throw new CanalClientException(
+                    "something goes wrong when doing authentication: " + ackBody.getErrorMessage());
             }
             }
 
 
             connected = true;
             connected = true;
@@ -323,7 +323,9 @@ public class SimpleCanalConnector implements CanalConnector {
         Packet p = Packet.parseFrom(data);
         Packet p = Packet.parseFrom(data);
         switch (p.getType()) {
         switch (p.getType()) {
             case MESSAGES: {
             case MESSAGES: {
-                if (!p.getCompression().equals(Compression.NONE)) {
+                // if (!p.getCompression().equals(Compression.NONE)) {
+                if (!p.getCompression().equals(Compression.NONE)
+                    && !p.getCompression().equals(Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
                     throw new CanalClientException("compression is not supported in this connector");
                     throw new CanalClientException("compression is not supported in this connector");
                 }
                 }
 
 
@@ -360,11 +362,8 @@ public class SimpleCanalConnector implements CanalConnector {
             .setBatchId(batchId)
             .setBatchId(batchId)
             .build();
             .build();
         try {
         try {
-            writeWithHeader(Packet.newBuilder()
-                .setType(PacketType.CLIENTACK)
-                .setBody(ca.toByteString())
-                .build()
-                .toByteArray());
+            writeWithHeader(
+                Packet.newBuilder().setType(PacketType.CLIENTACK).setBody(ca.toByteString()).build().toByteArray());
         } catch (IOException e) {
         } catch (IOException e) {
             throw new CanalClientException(e);
             throw new CanalClientException(e);
         }
         }

+ 2 - 1
client/src/main/java/com/alibaba/otter/canal/client/kafka/MessageDeserializer.java

@@ -36,7 +36,8 @@ public class MessageDeserializer implements Deserializer<Message> {
                 CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
                 CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
                 switch (p.getType()) {
                 switch (p.getType()) {
                     case MESSAGES: {
                     case MESSAGES: {
-                        if (!p.getCompression().equals(CanalPacket.Compression.NONE)) {
+                        if (!p.getCompression().equals(CanalPacket.Compression.NONE)
+                                && !p.getCompression().equals(CanalPacket.Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
                             throw new CanalClientException("compression is not supported in this connector");
                             throw new CanalClientException("compression is not supported in this connector");
                         }
                         }
 
 

+ 2 - 2
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -79,8 +79,8 @@ public class CanalKafkaProducer {
                 } else {
                 } else {
                     record = new ProducerRecord<String, Message>(topic.getTopic(), message);
                     record = new ProducerRecord<String, Message>(topic.getTopic(), message);
                 }
                 }
-                Future<RecordMetadata> future = producer.send(record);
-                future.get();
+
+                producer.send(record);
             } else {
             } else {
                 // 发送扁平数据json
                 // 发送扁平数据json
                 List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
                 List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);