Browse Source

fixed test

七锋 6 years ago
parent
commit
eb2a83b975

+ 1 - 2
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -13,7 +13,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import com.alibaba.fastjson.JSON;
-import com.alibaba.otter.canal.client.CanalMessageDeserializer;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -50,7 +49,7 @@ public class KafkaCanalConnector {
         properties.put("max.poll.records", "100");
         properties.put("key.deserializer", StringDeserializer.class.getName());
         if (!flatMessage) {
-            properties.put("value.deserializer", CanalMessageDeserializer.class.getName());
+            properties.put("value.deserializer", MessageDeserializer.class.getName());
         } else {
             properties.put("value.deserializer", StringDeserializer.class.getName());
         }

+ 31 - 0
client/src/main/java/com/alibaba/otter/canal/client/kafka/MessageDeserializer.java

@@ -0,0 +1,31 @@
+package com.alibaba.otter.canal.client.kafka;
+
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import com.alibaba.otter.canal.client.CanalMessageDeserializer;
+import com.alibaba.otter.canal.protocol.Message;
+
+/**
+ * Kafka Message类的反序列化
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class MessageDeserializer implements Deserializer<Message> {
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
+
+    @Override
+    public Message deserialize(String topic1, byte[] data) {
+        return CanalMessageDeserializer.deserializer(data);
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

+ 4 - 4
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java

@@ -1082,10 +1082,10 @@ public final class RowsLogBuffer {
                 buffer.fillBytes(binary, 0, len);
 
                 /* Warning unsupport cloumn type */
-                logger.warn(String.format("!! Unsupport column type MYSQL_TYPE_GEOMETRY: meta=%d (%04X), len = %d",
-                    meta,
-                    meta,
-                    len));
+                // logger.warn(String.format("!! Unsupport column type MYSQL_TYPE_GEOMETRY: meta=%d (%04X), len = %d",
+                // meta,
+                // meta,
+                // len));
                 javaType = Types.BINARY;
                 value = binary;
                 length = len;

+ 10 - 6
example/src/main/java/com/alibaba/otter/canal/example/SimpleCanalClientPermanceTest.java

@@ -1,6 +1,8 @@
 package com.alibaba.otter.canal.example;
+
 import java.net.InetSocketAddress;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
@@ -44,7 +46,7 @@ public class SimpleCanalClientPermanceTest {
             connector.connect();
             connector.subscribe();
             while (true) {
-                Message message = connector.getWithoutAck(batchSize);
+                Message message = connector.getWithoutAck(batchSize, 100L, TimeUnit.MILLISECONDS);
                 long batchId = message.getId();
                 int size = message.getRawEntries().size();
                 sum += size;
@@ -53,11 +55,13 @@ public class SimpleCanalClientPermanceTest {
                 queue.add(batchId);
                 if (count % 10 == 0) {
                     end = System.currentTimeMillis();
-                    long tps = (perSum * 1000) / (end - start);
-                    System.out.println(" total : " + sum + " , current : " + perSum + " , cost : " + (end - start)
-                                       + " , tps : " + tps);
-                    start = end;
-                    perSum = 0;
+                    if (end - start != 0) {
+                        long tps = (perSum * 1000) / (end - start);
+                        System.out.println(" total : " + sum + " , current : " + perSum + " , cost : " + (end - start)
+                                           + " , tps : " + tps);
+                        start = end;
+                        perSum = 0;
+                    }
                 }
             }
         } catch (Throwable e) {

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -50,7 +50,7 @@ public class SessionHandler extends SimpleChannelHandler {
         this.embeddedServer = embeddedServer;
     }
 
-    @SuppressWarnings({ "deprecation", "deprecation" })
+    @SuppressWarnings({ "deprecation" })
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
         logger.info("message receives in session handler...");
         long start = System.nanoTime();

+ 1 - 0
server/src/test/java/com/alibaba/otter/canal/server/ProtocolTest.java

@@ -56,6 +56,7 @@ public class ProtocolTest {
         }
     }
 
+    @SuppressWarnings("deprecation")
     private byte[] buildData(Message message) throws IOException {
         List<ByteString> rowEntries = message.getRawEntries();
         // message size