Browse Source

Merge pull request #1193 from woshijay7788/feature_kafka_config

add max.request.size config
agapple 6 years ago
parent
commit
df4c66b9e2

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -41,6 +41,7 @@ public class CanalConstants {
     public static final String CANAL_MQ_RETRIES                  = ROOT + "." + "mq.retries";
     public static final String CANAL_MQ_BATCHSIZE                = ROOT + "." + "mq.batchSize";
     public static final String CANAL_MQ_LINGERMS                 = ROOT + "." + "mq.lingerMs";
+    public static final String CANAL_MQ_MAXREQUESTSIZE           = ROOT + "." + "mq.maxRequestSize";
     public static final String CANAL_MQ_BUFFERMEMORY             = ROOT + "." + "mq.bufferMemory";
     public static final String CANAL_MQ_CANALBATCHSIZE           = ROOT + "." + "mq.canalBatchSize";
     public static final String CANAL_MQ_CANALGETTIMEOUT          = ROOT + "." + "mq.canalGetTimeout";

+ 4 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -101,6 +101,10 @@ public class CanalLauncher {
         if (!StringUtils.isEmpty(lingerMs)) {
             mqProperties.setLingerMs(Integer.valueOf(lingerMs));
         }
+        String maxRequestSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_MAXREQUESTSIZE);
+        if (!StringUtils.isEmpty(maxRequestSize)) {
+            mqProperties.setMaxRequestSize(Integer.valueOf(maxRequestSize));
+        }
         String bufferMemory = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BUFFERMEMORY);
         if (!StringUtils.isEmpty(bufferMemory)) {
             mqProperties.setBufferMemory(Long.valueOf(bufferMemory));

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -104,6 +104,7 @@ canal.instance.global.spring.xml = classpath:spring/file-instance.xml
 canal.mq.servers = 127.0.0.1:6667
 canal.mq.retries = 0
 canal.mq.batchSize = 16384
+canal.mq.maxRequestSize = 1048576
 canal.mq.lingerMs = 1
 canal.mq.bufferMemory = 33554432
 canal.mq.canalBatchSize = 50

+ 8 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -14,6 +14,7 @@ public class MQProperties {
     private int     retries                = 0;
     private int     batchSize              = 16384;
     private int     lingerMs               = 1;
+    private int     maxRequestSize         = 1048576;
     private long    bufferMemory           = 33554432L;
     private boolean filterTransactionEntry = true;
     private String  producerGroup          = "Canal-Producer";
@@ -185,4 +186,11 @@ public class MQProperties {
     public void setAliyunSecretKey(String aliyunSecretKey) {
         this.aliyunSecretKey = aliyunSecretKey;
     }
+    public int getMaxRequestSize() {
+        return maxRequestSize;
+    }
+
+    public void setMaxRequestSize(int maxRequestSize) {
+        this.maxRequestSize = maxRequestSize;
+    }
 }

+ 1 - 0
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -43,6 +43,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
         properties.put("retries", kafkaProperties.getRetries());
         properties.put("batch.size", kafkaProperties.getBatchSize());
         properties.put("linger.ms", kafkaProperties.getLingerMs());
+        properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
         properties.put("buffer.memory", kafkaProperties.getBufferMemory());
         properties.put("key.serializer", StringSerializer.class.getName());
         if (!kafkaProperties.getFlatMessage()) {