瀏覽代碼

kafka 客户端测试类

rewerma 7 年之前
父節點
當前提交
b96e4dd23a

+ 3 - 7
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnector.java

@@ -1,7 +1,6 @@
 package com.alibaba.otter.canal.kafka.client;
 
 import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
@@ -73,16 +72,13 @@ public class KafkaCanalConnector {
     }
 
     /**
-     * 订阅topic,不能重复订阅
+     * 订阅topic
      */
     public void subscribe() {
         try {
-            if (kafkaConsumer != null) {
-                throw new CanalClientException("resubscribe error");
+            if (kafkaConsumer == null) {
+                kafkaConsumer = new KafkaConsumer<String, Message>(properties);
             }
-
-            kafkaConsumer = new KafkaConsumer<String, Message>(properties);
-
             if (partition == null) {
                 kafkaConsumer.subscribe(Collections.singletonList(topic));
             } else {

+ 1 - 1
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/AbstractKafkaTest.java

@@ -6,7 +6,7 @@ public class AbstractKafkaTest {
 
     protected String topic = "example";
     protected Integer partition = null;
-    protected String groupId    = "g1";
+    protected String groupId    = "g2";
     protected String servers    = "slave1.test.apitops.com:6667,slave2.test.apitops.com:6667,slave3.test.apitops.com:6667";
 
     public void sleep(long time) {

+ 21 - 9
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/ClientRunningTest.java

@@ -3,12 +3,17 @@ package com.alibaba.otter.canal.kafka.client.running;
 import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
 import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
 import com.alibaba.otter.canal.protocol.Message;
+import org.apache.kafka.common.errors.WakeupException;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 public class ClientRunningTest extends AbstractKafkaTest {
+    private Logger logger = LoggerFactory.getLogger(ClientRunningTest.class);
 
     private boolean running = true;
 
@@ -16,26 +21,33 @@ public class ClientRunningTest extends AbstractKafkaTest {
     public void testKafkaConsumer() {
         final ExecutorService executor = Executors.newFixedThreadPool(1);
 
-        final KafkaCanalConnector kafkaCanalConnector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
-        kafkaCanalConnector.subscribe();
+        final KafkaCanalConnector connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
+        connector.subscribe();
 
         executor.submit(new Runnable() {
             @Override
             public void run() {
                 while (running) {
-                    Message message = kafkaCanalConnector.getWithoutAck();
-                    if (message != null) {
-                        System.out.println(message);
-                        sleep(40000);
+                    try {
+                        Message message = connector.getWithoutAck(3L, TimeUnit.SECONDS);
+                        if (message != null) {
+                            System.out.println(message);
+                        }
+                        connector.ack();
+                    } catch (WakeupException e) {
+                        //ignore
                     }
-                    kafkaCanalConnector.ack();
                 }
             }
         });
 
-        sleep(120000);
+        sleep(60000);
+        connector.disconnect();
         running = false;
-        kafkaCanalConnector.disconnect();
+        executor.shutdown();
+        logger.info("shutdown completed");
+
+        sleep(10000);
     }
 
 }

+ 92 - 0
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/KafkaCanalClientExample.java

@@ -0,0 +1,92 @@
+package com.alibaba.otter.canal.kafka.client.running;
+
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+public class KafkaCanalClientExample {
+    protected final static Logger logger = LoggerFactory.getLogger(KafkaCanalClientExample.class);
+
+    private KafkaCanalConnector connector;
+
+    private volatile boolean running = false;
+
+    private Thread thread = null;
+
+    private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error("parse events has an error", e);
+        }
+    };
+
+    public KafkaCanalClientExample(String servers, String topic, Integer partition, String groupId) {
+        connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
+    }
+
+    public void start() {
+        Assert.notNull(connector, "connector is null");
+        thread = new Thread(new Runnable() {
+
+            public void run() {
+                process();
+            }
+        });
+
+        thread.setUncaughtExceptionHandler(handler);
+        running = true;
+    }
+
+    public void stop() {
+        if (!running) {
+            return;
+        }
+        connector.disconnect();
+        running = false;
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    private void process() {
+        while (running) {
+            try {
+                connector.subscribe();
+                while (running) {
+                    try {
+                        Message message = connector.getWithoutAck(); //获取message
+                        long batchId = message.getId();
+                        int size = message.getEntries().size();
+                        if (batchId == -1 || size == 0) {
+                            // try {
+                            // Thread.sleep(1000);
+                            // } catch (InterruptedException e) {
+                            // }
+                        } else {
+                            // printSummary(message, batchId, size);
+                            // printEntry(message.getEntries());
+                            logger.info(message.toString());
+                        }
+
+                        connector.ack(); // 提交确认
+                    } catch (WakeupException e) {
+                        //ignore
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                    }
+                }
+            } catch (WakeupException e) {
+                //ignore
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+}