瀏覽代碼

kafka 客户端 example

rewerma 7 年之前
父節點
當前提交
ec072a45d8

+ 5 - 5
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/AbstractKafkaTest.java

@@ -2,12 +2,12 @@ package com.alibaba.otter.canal.kafka.client.running;
 
 import org.junit.Assert;
 
-public class AbstractKafkaTest {
+public abstract class AbstractKafkaTest {
 
-    protected String topic = "example";
-    protected Integer partition = null;
-    protected String groupId    = "g2";
-    protected String servers    = "slave1.test.apitops.com:6667,slave2.test.apitops.com:6667,slave3.test.apitops.com:6667";
+    public static String topic = "example";
+    public static Integer partition = null;
+    public static String groupId    = "g1";
+    public static String servers    = "slave1.test.apitops.com:6667,slave2.test.apitops.com:6667,slave3.test.apitops.com:6667";
 
     public void sleep(long time) {
         try {

+ 36 - 3
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/KafkaCanalClientExample.java

@@ -8,12 +8,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
+import java.util.concurrent.TimeUnit;
+
 public class KafkaCanalClientExample {
     protected final static Logger logger = LoggerFactory.getLogger(KafkaCanalClientExample.class);
 
     private KafkaCanalConnector connector;
 
-    private volatile boolean running = false;
+    private static volatile boolean running = false;
 
     private Thread thread = null;
 
@@ -27,6 +29,34 @@ public class KafkaCanalClientExample {
         connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
     }
 
+    public static void main(String[] args) {
+        try {
+            final KafkaCanalClientExample kafkaCanalClientExample = new KafkaCanalClientExample(AbstractKafkaTest.servers,
+                    AbstractKafkaTest.topic, AbstractKafkaTest.partition, AbstractKafkaTest.groupId);
+            logger.info("## start the kafka consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
+            kafkaCanalClientExample.start();
+            logger.info("## the canal kafka consumer is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## stop the kafka consumer");
+                        kafkaCanalClientExample.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping kafka consumer:", e);
+                    } finally {
+                        logger.info("## kafka consumer is down.");
+                    }
+                }
+
+            });
+            while (running) ;
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the kafka consumer:", e);
+            System.exit(0);
+        }
+    }
+
     public void start() {
         Assert.notNull(connector, "connector is null");
         thread = new Thread(new Runnable() {
@@ -35,8 +65,8 @@ public class KafkaCanalClientExample {
                 process();
             }
         });
-
         thread.setUncaughtExceptionHandler(handler);
+        thread.start();
         running = true;
     }
 
@@ -61,7 +91,10 @@ public class KafkaCanalClientExample {
                 connector.subscribe();
                 while (running) {
                     try {
-                        Message message = connector.getWithoutAck(); //获取message
+                        Message message = connector.getWithoutAck(1L, TimeUnit.SECONDS); //获取message
+                        if (message == null) {
+                            continue;
+                        }
                         long batchId = message.getId();
                         int size = message.getEntries().size();
                         if (batchId == -1 || size == 0) {