Browse Source

kafka client keep alive间隔时间

mcy 6 years ago
parent
commit
b5e05c8c77

+ 6 - 2
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -143,9 +143,13 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                             });
 
                             // 间隔一段时间ack一次, 防止因超时未响应切换到另外台客户端
+                            long currentTS = System.currentTimeMillis();
                             while (executing.get()) {
-                                connector.ack();
-                                Thread.sleep(500);
+                                // 大于1分钟未消费完ack一次keep alive
+                                if (System.currentTimeMillis() - currentTS >  60000) {
+                                    connector.ack();
+                                    currentTS = System.currentTimeMillis();
+                                }
                             }
                         } else {
                             connector.ack();