Browse Source

Merge pull request #892 from rewerma/master

kafka client keep alive间隔时间
agapple 6 years ago
parent
commit
fd5cfe23db

+ 6 - 2
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -143,9 +143,13 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                             });
 
                             // 间隔一段时间ack一次, 防止因超时未响应切换到另外台客户端
+                            long currentTS = System.currentTimeMillis();
                             while (executing.get()) {
-                                connector.ack();
-                                Thread.sleep(500);
+                                // 大于1分钟未消费完ack一次keep alive
+                                if (System.currentTimeMillis() - currentTS >  60000) {
+                                    connector.ack();
+                                    currentTS = System.currentTimeMillis();
+                                }
                             }
                         } else {
                             connector.ack();

+ 0 - 2
deployer/src/main/resources/kafka.yml

@@ -3,8 +3,6 @@ retries: 0
 batchSize: 16384
 lingerMs: 1
 bufferMemory: 33554432
-# canal的批次大小,单位 k,量大建议改为1M
-canalBatchSize: 50
 filterTransactionEntry: true
 
 canalDestinations:

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/kafka/KafkaProperties.java

@@ -19,7 +19,7 @@ public class KafkaProperties {
     private int                    lingerMs               = 1;
     private long                   bufferMemory           = 33554432L;
     private boolean                filterTransactionEntry = true;
-    private int                    canalBatchSize         = 5;
+    private int                    canalBatchSize         = 50;
 
     private List<CanalDestination> canalDestinations      = new ArrayList<CanalDestination>();