Bläddra i källkod

kafka 客户端 example 完善

rewerma 7 år sedan
förälder
incheckning
9964bb75b1

+ 31 - 29
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/KafkaCanalClientExample.java

@@ -86,40 +86,42 @@ public class KafkaCanalClientExample {
     }
 
     private void process() {
-        while (running) {
-            try {
-                connector.subscribe();
-                while (running) {
+        try {
+            connector.subscribe();
+            while (running) {
+                try {
+                    Message message = connector.getWithoutAck(1L, TimeUnit.SECONDS); //获取message
+                    if (message == null) {
+                        continue;
+                    }
+                    long batchId = message.getId();
+                    int size = message.getEntries().size();
+                    if (batchId == -1 || size == 0) {
+                        // try {
+                        // Thread.sleep(1000);
+                        // } catch (InterruptedException e) {
+                        // }
+                    } else {
+                        // printSummary(message, batchId, size);
+                        // printEntry(message.getEntries());
+                        logger.info(message.toString());
+                    }
+
+                    connector.ack(); // 提交确认
+                } catch (WakeupException e) {
                     try {
-                        Message message = connector.getWithoutAck(1L, TimeUnit.SECONDS); //获取message
-                        if (message == null) {
-                            continue;
-                        }
-                        long batchId = message.getId();
-                        int size = message.getEntries().size();
-                        if (batchId == -1 || size == 0) {
-                            // try {
-                            // Thread.sleep(1000);
-                            // } catch (InterruptedException e) {
-                            // }
-                        } else {
-                            // printSummary(message, batchId, size);
-                            // printEntry(message.getEntries());
-                            logger.info(message.toString());
-                        }
-
-                        connector.ack(); // 提交确认
-                    } catch (WakeupException e) {
+                        Thread.sleep(500); //延时确保running状态的改变
+                    } catch (InterruptedException ie) {
                         //ignore
-                    } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
                     }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
                 }
-            } catch (WakeupException e) {
-                //ignore
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
             }
+        } catch (WakeupException e) {
+            //ignore
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
     }
 }