Browse Source

增加canal-kafka-client的running状态监控

rewerma 6 năm trước cách đây
mục cha
commit
f2e3ff2dae

+ 7 - 1
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -76,7 +76,7 @@ public class SimpleCanalConnector implements CanalConnector {
     private Object               readDataLock          = new Object();
     private Object               writeDataLock         = new Object();
 
-    private boolean              running               = false;
+    private volatile boolean     running               = false;
 
     public SimpleCanalConnector(SocketAddress address, String username, String password, String destination){
         this(address, username, password, destination, 60000, 60 * 60 * 1000);
@@ -289,6 +289,9 @@ public class SimpleCanalConnector implements CanalConnector {
 
     public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
         waitClientRunning();
+        if (!running) {
+            return null;
+        }
         try {
             int size = (batchSize <= 0) ? 1000 : batchSize;
             long time = (timeout == null || timeout < 0) ? -1 : timeout; // -1代表不做timeout控制
@@ -342,6 +345,9 @@ public class SimpleCanalConnector implements CanalConnector {
 
     public void ack(long batchId) throws CanalClientException {
         waitClientRunning();
+        if (!running) {
+            return;
+        }
         ClientAck ca = ClientAck.newBuilder()
             .setDestination(clientIdentity.getDestination())
             .setClientId(String.valueOf(clientIdentity.getClientId()))

+ 132 - 18
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnector.java

@@ -1,15 +1,21 @@
 package com.alibaba.otter.canal.kafka.client;
 
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
+import com.alibaba.otter.canal.common.utils.AddressUtils;
+import com.alibaba.otter.canal.common.utils.BooleanMutex;
+import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
+import com.alibaba.otter.canal.kafka.client.running.ClientRunningData;
+import com.alibaba.otter.canal.kafka.client.running.ClientRunningListener;
+import com.alibaba.otter.canal.kafka.client.running.ClientRunningMonitor;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
-import com.alibaba.otter.canal.protocol.Message;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 /**
  * canal kafka 数据操作客户端
@@ -20,14 +26,16 @@ import com.alibaba.otter.canal.protocol.Message;
 public class KafkaCanalConnector {
 
     private KafkaConsumer<String, Message> kafkaConsumer;
-
-    private String                         topic;
-
-    private Integer                        partition;
-
-    private Properties                     properties;
-
-    public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId){
+    private String topic;
+    private Integer partition;
+    private Properties properties;
+    private ClientRunningMonitor runningMonitor;  // 运行控制
+    private ZkClientx zkClientx;
+    private BooleanMutex mutex = new BooleanMutex(false);
+    private volatile boolean connected = false;
+    private volatile boolean running = false;
+
+    public KafkaCanalConnector(String zkServers, String servers, String topic, Integer partition, String groupId) {
         this.topic = topic;
         this.partition = partition;
 
@@ -36,13 +44,35 @@ public class KafkaCanalConnector {
         properties.put("group.id", groupId);
         properties.put("enable.auto.commit", false);
         properties.put("auto.commit.interval.ms", "1000");
-        properties.put("auto.offset.reset", "latest"); // earliest
-                                                       // //如果没有offset则从最后的offset开始读
+        properties.put("auto.offset.reset", "latest"); //如果没有offset则从最后的offset开始读
         properties.put("request.timeout.ms", "40000"); // 必须大于session.timeout.ms的设置
         properties.put("session.timeout.ms", "30000"); // 默认为30秒
         properties.put("max.poll.records", "1"); // 所以一次只取一条数据
         properties.put("key.deserializer", StringDeserializer.class.getName());
         properties.put("value.deserializer", MessageDeserializer.class.getName());
+
+        if (zkServers != null) {
+            zkClientx = new ZkClientx(zkServers);
+
+            ClientRunningData clientData = new ClientRunningData();
+            clientData.setGroupId(groupId);
+            clientData.setAddress(AddressUtils.getHostIp());
+
+            runningMonitor = new ClientRunningMonitor();
+            runningMonitor.setTopic(topic);
+            runningMonitor.setZkClient(zkClientx);
+            runningMonitor.setClientData(clientData);
+            runningMonitor.setListener(new ClientRunningListener() {
+                public void processActiveEnter() {
+                    mutex.set(true);
+                }
+
+                public void processActiveExit() {
+                    mutex.set(false);
+                }
+            });
+        }
+
     }
 
     /**
@@ -57,20 +87,75 @@ public class KafkaCanalConnector {
         properties.put("session.timeout.ms", String.valueOf(t));
     }
 
+    /**
+     * 打开连接
+     */
+    public void connect() {
+        if (connected) {
+            return;
+        }
+
+        if (runningMonitor != null) {
+            if (!runningMonitor.isStart()) {
+                runningMonitor.start();
+            }
+        }
+
+        connected = true;
+
+        if (kafkaConsumer == null) {
+            kafkaConsumer = new KafkaConsumer<String, Message>(properties);
+        }
+    }
+
     /**
      * 关闭链接
      */
-    public void close() {
+    public void disconnnect() {
         kafkaConsumer.close();
+
+        connected = false;
+        if (runningMonitor.isStart()) {
+            runningMonitor.stop();
+        }
+    }
+
+    private void waitClientRunning() {
+        try {
+            if (zkClientx != null) {
+                if (!connected) {// 未调用connect
+                    throw new CanalClientException("should connect first");
+                }
+
+                running = true;
+                mutex.get();// 阻塞等待
+            } else {
+                // 单机模式直接设置为running
+                running = true;
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new CanalClientException(e);
+        }
+    }
+
+    public boolean checkValid() {
+        if (zkClientx != null) {
+            return mutex.state();
+        } else {
+            return true;// 默认都放过
+        }
     }
 
     /**
      * 订阅topic
      */
     public void subscribe() {
-        if (kafkaConsumer == null) {
-            kafkaConsumer = new KafkaConsumer<String, Message>(properties);
+        waitClientRunning();
+        if (!running) {
+            return;
         }
+
         if (partition == null) {
             kafkaConsumer.subscribe(Collections.singletonList(topic));
         } else {
@@ -83,6 +168,11 @@ public class KafkaCanalConnector {
      * 取消订阅
      */
     public void unsubscribe() {
+        waitClientRunning();
+        if (!running) {
+            return;
+        }
+
         kafkaConsumer.unsubscribe();
     }
 
@@ -96,6 +186,11 @@ public class KafkaCanalConnector {
     }
 
     public Message get(Long timeout, TimeUnit unit) {
+        waitClientRunning();
+        if (!running) {
+            return null;
+        }
+
         Message message = getWithoutAck(timeout, unit);
         this.ack();
         return message;
@@ -111,6 +206,11 @@ public class KafkaCanalConnector {
      * @return
      */
     public Message getWithoutAck(Long timeout, TimeUnit unit) {
+        waitClientRunning();
+        if (!running) {
+            return null;
+        }
+
         ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout)); // 基于配置,最多只能poll到一条数据
 
         if (!records.isEmpty()) {
@@ -123,6 +223,20 @@ public class KafkaCanalConnector {
      * 提交offset,如果超过 session.timeout.ms 设置的时间没有ack则会抛出异常,ack失败
      */
     public void ack() {
+        waitClientRunning();
+        if (!running) {
+            return;
+        }
+
         kafkaConsumer.commitSync();
     }
+
+    public void stopRunning() {
+        if (running) {
+            running = false; // 设置为非running状态
+            if (!mutex.state()) {
+                mutex.set(true); // 中断阻塞
+            }
+        }
+    }
 }

+ 31 - 4
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnectors.java

@@ -9,7 +9,7 @@ package com.alibaba.otter.canal.kafka.client;
 public class KafkaCanalConnectors {
 
     /**
-     * 创建kafka客户端链接
+     * 创建kafka客户端链接,独立运行不注册zk信息
      *
      * @param servers
      * @param topic
@@ -18,11 +18,11 @@ public class KafkaCanalConnectors {
      * @return
      */
     public static KafkaCanalConnector newKafkaConnector(String servers, String topic, Integer partition, String groupId) {
-        return new KafkaCanalConnector(servers, topic, partition, groupId);
+        return new KafkaCanalConnector(null, servers, topic, partition, groupId);
     }
 
     /**
-     * 创建kafka客户端链接
+     * 创建kafka客户端链接,独立运行不注册zk信息
      *
      * @param servers
      * @param topic
@@ -30,6 +30,33 @@ public class KafkaCanalConnectors {
      * @return
      */
     public static KafkaCanalConnector newKafkaConnector(String servers, String topic, String groupId) {
-        return new KafkaCanalConnector(servers, topic, null, groupId);
+        return new KafkaCanalConnector(null, servers, topic, null, groupId);
+    }
+
+    /**
+     * 创建kafka客户端链接
+     *
+     * @param zkServers
+     * @param servers
+     * @param topic
+     * @param partition
+     * @param groupId
+     * @return
+     */
+    public static KafkaCanalConnector newKafkaConnector(String zkServers, String servers, String topic, Integer partition, String groupId) {
+        return new KafkaCanalConnector(zkServers, servers, topic, partition, groupId);
+    }
+
+    /**
+     * 创建kafka客户端链接
+     *
+     * @param zkServers
+     * @param servers
+     * @param topic
+     * @param groupId
+     * @return
+     */
+    public static KafkaCanalConnector newKafkaConnector(String zkServers, String servers, String topic, String groupId) {
+        return new KafkaCanalConnector(zkServers, servers, topic, null, groupId);
     }
 }

+ 39 - 0
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/running/ClientRunningData.java

@@ -0,0 +1,39 @@
+package com.alibaba.otter.canal.kafka.client.running;
+
+/**
+ * client running状态信息
+ *
+ * @author machengyuan 2018-06-20 下午04:10:12
+ * @version 1.0.0
+ */
+public class ClientRunningData {
+
+    private String groupId;
+    private String address;
+    private boolean active = true;
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    public boolean isActive() {
+        return active;
+    }
+
+    public void setActive(boolean active) {
+        this.active = active;
+    }
+
+}

+ 21 - 0
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/running/ClientRunningListener.java

@@ -0,0 +1,21 @@
+package com.alibaba.otter.canal.kafka.client.running;
+
+/**
+ * client running状态信息
+ *
+ * @author machengyuan 2018-06-20 下午04:10:12
+ * @version 1.0.0
+ */
+public interface ClientRunningListener {
+
+    /**
+     * 触发现在轮到自己做为active
+     */
+    public void processActiveEnter();
+
+    /**
+     * 触发一下当前active模式失败
+     */
+    public void processActiveExit();
+
+}

+ 275 - 0
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/running/ClientRunningMonitor.java

@@ -0,0 +1,275 @@
+package com.alibaba.otter.canal.kafka.client.running;
+
+import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
+import com.alibaba.otter.canal.common.utils.AddressUtils;
+import com.alibaba.otter.canal.common.utils.BooleanMutex;
+import com.alibaba.otter.canal.common.utils.JsonUtils;
+import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
+import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.text.MessageFormat;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * kafka client running状态信息
+ *
+ * @author machengyuan 2018-06-20 下午04:10:12
+ * @version 1.0.0
+ */
+public class ClientRunningMonitor extends AbstractCanalLifeCycle {
+
+    private static final String TOPIC_ROOT_NODE = ZookeeperPathUtils.CANAL_ROOT_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + "topics";
+
+    private static final String TOPIC_NODE = TOPIC_ROOT_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + "{0}";
+
+    private static final String TOPIC_CLIENTID_NODE = TOPIC_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + "{1}";
+
+    private static final String TOPIC_CLIENTID_RUNNING_NODE = TOPIC_CLIENTID_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + ZookeeperPathUtils.RUNNING_NODE;
+
+    private static String getTopicClientRunning(String topic, String groupId) {
+        return MessageFormat.format(TOPIC_CLIENTID_RUNNING_NODE, topic, groupId);
+    }
+
+    private static String getClientIdNodePath(String topic, String groupId) {
+        return MessageFormat.format(TOPIC_CLIENTID_NODE, topic, groupId);
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(ClientRunningMonitor.class);
+    private ZkClientx zkClient;
+    private String topic;
+    private ClientRunningData clientData;
+    private IZkDataListener dataListener;
+    private BooleanMutex mutex = new BooleanMutex(false);
+    private volatile boolean release = false;
+    private volatile ClientRunningData activeData;
+    private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
+    private ClientRunningListener listener;
+    private int delayTime = 5;
+
+    private static Integer virtualPort;
+
+    public ClientRunningMonitor() {
+        if (virtualPort == null) {
+            Random rand = new Random();
+            virtualPort = rand.nextInt(9000) + 1000;
+        }
+
+        dataListener = new IZkDataListener() {
+
+            public void handleDataChange(String dataPath, Object data) throws Exception {
+                MDC.put("kafkaTopic", topic);
+                ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
+                if (!isMine(runningData.getAddress())) {
+                    mutex.set(false);
+                }
+
+                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
+                    release = true;
+                    releaseRunning();// 彻底释放mainstem
+                }
+
+                activeData = (ClientRunningData) runningData;
+            }
+
+            public void handleDataDeleted(String dataPath) throws Exception {
+                MDC.put("kafkaTopic", topic);
+
+                mutex.set(false);
+                // 触发一下退出,可能是人为干预的释放操作或者网络闪断引起的session expired timeout
+                processActiveExit();
+                if (!release && activeData != null && isMine(activeData.getAddress())) {
+                    // 如果上一次active的状态就是本机,则即时触发一下active抢占
+                    initRunning();
+                } else {
+                    // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
+                    delayExector.schedule(new Runnable() {
+
+                        public void run() {
+                            initRunning();
+                        }
+                    }, delayTime, TimeUnit.SECONDS);
+                }
+            }
+
+        };
+
+    }
+
+
+    public void start() {
+        super.start();
+
+        String path = getTopicClientRunning(this.topic, clientData.getGroupId());
+
+        zkClient.subscribeDataChanges(path, dataListener);
+        initRunning();
+    }
+
+    public void stop() {
+        super.stop();
+        String path = getTopicClientRunning(this.topic, clientData.getGroupId());
+        zkClient.unsubscribeDataChanges(path, dataListener);
+        releaseRunning(); // 尝试一下release
+        //Fix issue #697
+        if (delayExector != null) {
+            delayExector.shutdown();
+        }
+    }
+
+    // 改动记录:
+    // 1,在方法上加synchronized关键字,保证同步顺序执行;
+    // 2,判断Zk上已经存在的activeData是否是本机,是的话把mutex重置为true,否则会导致死锁
+    // 3,增加异常处理,保证出现异常时,running节点能被删除,否则会导致死锁
+    public synchronized void initRunning() {
+        if (!isStart()) {
+            return;
+        }
+
+        String path = getTopicClientRunning(this.topic, clientData.getGroupId());
+        // 序列化
+        byte[] bytes = JsonUtils.marshalToByte(clientData);
+        try {
+            mutex.set(false);
+            zkClient.create(path, bytes, CreateMode.EPHEMERAL);
+            processActiveEnter();// 触发一下事件
+            activeData = clientData;
+            mutex.set(true);
+        } catch (ZkNodeExistsException e) {
+            bytes = zkClient.readData(path, true);
+            if (bytes == null) {// 如果不存在节点,立即尝试一次
+                initRunning();
+            } else {
+                activeData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
+                // 如果发现已经存在,判断一下是否自己,避免活锁
+                if (activeData.getAddress().contains(":") && isMine(activeData.getAddress())) {
+                    mutex.set(true);
+                }
+            }
+        } catch (ZkNoNodeException e) {
+            zkClient.createPersistent(getClientIdNodePath(this.topic, clientData.getGroupId()),
+                    true); // 尝试创建父节点
+            initRunning();
+        } catch (Throwable t) {
+            logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",
+                    topic),
+                    t);
+            // 出现任何异常尝试release
+            releaseRunning();
+            throw new CanalClientException("something goes wrong in initRunning method. ", t);
+        }
+    }
+
+    /**
+     * 阻塞等待自己成为active,如果自己成为active,立马返回
+     *
+     * @throws InterruptedException
+     */
+    public void waitForActive() throws InterruptedException {
+        initRunning();
+        mutex.get();
+    }
+
+    /**
+     * 检查当前的状态
+     */
+    public boolean check() {
+        String path = getTopicClientRunning(this.topic, clientData.getGroupId());
+        //ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
+        try {
+            byte[] bytes = zkClient.readData(path);
+            ClientRunningData eventData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
+            activeData = eventData;// 更新下为最新值
+            // 检查下nid是否为自己
+            boolean result = isMine(activeData.getAddress());
+            if (!result) {
+                logger.warn("canal is running in [{}] , but not in [{}]",
+                        activeData.getAddress(),
+                        clientData.getAddress());
+            }
+            return result;
+        } catch (ZkNoNodeException e) {
+            logger.warn("canal is not run any in node");
+            return false;
+        } catch (ZkInterruptedException e) {
+            logger.warn("canal check is interrupt");
+            Thread.interrupted();// 清除interrupt标记
+            return check();
+        } catch (ZkException e) {
+            logger.warn("canal check is failed");
+            return false;
+        }
+    }
+
+    public boolean releaseRunning() {
+        if (check()) {
+            String path = getTopicClientRunning(this.topic, clientData.getGroupId());
+            zkClient.delete(path);
+            mutex.set(false);
+            processActiveExit();
+            return true;
+        }
+
+        return false;
+    }
+
+    // ====================== helper method ======================
+
+    private boolean isMine(String address) {
+        return address.equals(clientData.getAddress());
+    }
+
+    private void processActiveEnter() {
+        if (listener != null) {
+            // 触发回调
+            listener.processActiveEnter();
+            this.clientData.setAddress(/*address*/AddressUtils.getHostIp() + ":" + virtualPort);
+
+            String path = getTopicClientRunning(this.topic, clientData.getGroupId());
+            // 序列化
+            byte[] bytes = JsonUtils.marshalToByte(clientData);
+            zkClient.writeData(path, bytes);
+        }
+    }
+
+    private void processActiveExit() {
+        if (listener != null) {
+            listener.processActiveExit();
+        }
+    }
+
+    public void setListener(ClientRunningListener listener) {
+        this.listener = listener;
+    }
+
+    // ===================== setter / getter =======================
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setClientData(ClientRunningData clientData) {
+        this.clientData = clientData;
+    }
+
+    public void setDelayTime(int delayTime) {
+        this.delayTime = delayTime;
+    }
+
+    public void setZkClient(ZkClientx zkClient) {
+        this.zkClient = zkClient;
+    }
+
+}

+ 2 - 1
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/AbstractKafkaTest.java

@@ -12,8 +12,9 @@ public abstract class AbstractKafkaTest {
 
     public static String  topic     = "example";
     public static Integer partition = null;
-    public static String  groupId   = "g1";
+    public static String  groupId   = "g4";
     public static String  servers   = "slave1:6667,slave2:6667,slave3:6667";
+    public static String  zkServers = "slave1:2181,slave2:2181,slave3:2181";
 
     public void sleep(long time) {
         try {

+ 45 - 40
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/CanalKafkaClientExample.java

@@ -19,31 +19,32 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class CanalKafkaClientExample {
 
-    protected final static Logger           logger  = LoggerFactory.getLogger(CanalKafkaClientExample.class);
+    protected final static Logger logger = LoggerFactory.getLogger(CanalKafkaClientExample.class);
 
-    private KafkaCanalConnector             connector;
+    private KafkaCanalConnector connector;
 
-    private static volatile boolean         running = false;
+    private static volatile boolean running = false;
 
-    private Thread                          thread  = null;
+    private Thread thread = null;
 
     private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
 
-                                                        public void uncaughtException(Thread t, Throwable e) {
-                                                            logger.error("parse events has an error", e);
-                                                        }
-                                                    };
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error("parse events has an error", e);
+        }
+    };
 
-    public CanalKafkaClientExample(String servers, String topic, Integer partition, String groupId){
-        connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
+    public CanalKafkaClientExample(String zkServers, String servers, String topic, Integer partition, String groupId) {
+        connector = KafkaCanalConnectors.newKafkaConnector(zkServers, servers, topic, partition, groupId);
     }
 
     public static void main(String[] args) {
         try {
-            final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(AbstractKafkaTest.servers,
-                AbstractKafkaTest.topic,
-                AbstractKafkaTest.partition,
-                AbstractKafkaTest.groupId);
+            final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(AbstractKafkaTest.zkServers,
+                    AbstractKafkaTest.servers,
+                    AbstractKafkaTest.topic,
+                    AbstractKafkaTest.partition,
+                    AbstractKafkaTest.groupId);
             logger.info("## start the kafka consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
             kafkaCanalClientExample.start();
             logger.info("## the canal kafka consumer is running now ......");
@@ -86,6 +87,7 @@ public class CanalKafkaClientExample {
         if (!running) {
             return;
         }
+        connector.stopRunning();
         running = false;
         if (thread != null) {
             try {
@@ -99,34 +101,37 @@ public class CanalKafkaClientExample {
     private void process() {
         while (!running)
             ;
-        try {
-            connector.subscribe();
-            while (running) {
-                try {
-                    Message message = connector.getWithoutAck(1L, TimeUnit.SECONDS); // 获取message
-                    if (message == null) {
-                        continue;
-                    }
-                    long batchId = message.getId();
-                    int size = message.getEntries().size();
-                    if (batchId == -1 || size == 0) {
-                        // try {
-                        // Thread.sleep(1000);
-                        // } catch (InterruptedException e) {
-                        // }
-                    } else {
-                        // printSummary(message, batchId, size);
-                        // printEntry(message.getEntries());
-                        logger.info(message.toString());
+        while (running) {
+            try {
+                connector.connect();
+                connector.subscribe();
+                while (running) {
+                    try {
+                        Message message = connector.getWithoutAck(1L, TimeUnit.SECONDS); // 获取message
+                        if (message == null) {
+                            continue;
+                        }
+                        long batchId = message.getId();
+                        int size = message.getEntries().size();
+                        if (batchId == -1 || size == 0) {
+                            // try {
+                            // Thread.sleep(1000);
+                            // } catch (InterruptedException e) {
+                            // }
+                        } else {
+                            // printSummary(message, batchId, size);
+                            // printEntry(message.getEntries());
+                            logger.info(message.toString());
+                        }
+
+                        connector.ack(); // 提交确认
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
                     }
-
-                    connector.ack(); // 提交确认
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
                 }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
             }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
         }
 
         try {
@@ -134,6 +139,6 @@ public class CanalKafkaClientExample {
         } catch (WakeupException e) {
             // No-op. Continue process
         }
-        connector.close();
+        connector.disconnnect();
     }
 }

+ 2 - 1
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/KafkaClientRunningTest.java

@@ -35,6 +35,7 @@ public class KafkaClientRunningTest extends AbstractKafkaTest {
 
             @Override
             public void run() {
+                connector.connect();
                 connector.subscribe();
                 while (running) {
                     try {
@@ -48,7 +49,7 @@ public class KafkaClientRunningTest extends AbstractKafkaTest {
                     }
                 }
                 connector.unsubscribe();
-                connector.close();
+                connector.disconnnect();
             }
         });