Browse Source

增加kafka get的超时时间

mcy 6 years ago
parent
commit
81c216f1c1

+ 5 - 1
deployer/src/main/resources/kafka.yml

@@ -3,7 +3,11 @@ retries: 0
 batchSize: 16384
 lingerMs: 1
 bufferMemory: 33554432
-canalBatchSize: 50 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
+
+# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
+canalBatchSize: 50
+# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
+canalGetTimeout: 100
 
 canalDestinations:
   - canalDestination: example

+ 11 - 1
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaStarter.java

@@ -4,6 +4,7 @@ import java.io.FileInputStream;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -117,7 +118,16 @@ public class CanalKafkaStarter implements CanalServerStarter {
                 logger.info("## the canal consumer {} is running now ......", destination.getCanalDestination());
 
                 while (running) {
-                    Message message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize()); // 获取指定数量的数据
+                    Message message;
+                    if (kafkaProperties.getCanalGetTimeout() != null) {
+                        message = server.getWithoutAck(clientIdentity,
+                            kafkaProperties.getCanalBatchSize(),
+                            kafkaProperties.getCanalGetTimeout(),
+                            TimeUnit.MILLISECONDS);
+                    } else {
+                        message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize());
+                    }
+
                     final long batchId = message.getId();
                     try {
                         int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();

+ 9 - 0
server/src/main/java/com/alibaba/otter/canal/kafka/KafkaProperties.java

@@ -20,6 +20,7 @@ public class KafkaProperties {
     private long                   bufferMemory           = 33554432L;
     private boolean                filterTransactionEntry = true;
     private int                    canalBatchSize         = 50;
+    private Long                   canalGetTimeout;
 
     private List<CanalDestination> canalDestinations      = new ArrayList<CanalDestination>();
 
@@ -151,6 +152,14 @@ public class KafkaProperties {
         this.canalBatchSize = canalBatchSize;
     }
 
+    public Long getCanalGetTimeout() {
+        return canalGetTimeout;
+    }
+
+    public void setCanalGetTimeout(Long canalGetTimeout) {
+        this.canalGetTimeout = canalGetTimeout;
+    }
+
     public List<CanalDestination> getCanalDestinations() {
         return canalDestinations;
     }