Bladeren bron

Merge pull request #1141 from Renkai/compression_type

Kafka增加参数acks和compressionType
agapple 6 jaren geleden
bovenliggende
commit
51a299259b

+ 2 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -45,6 +45,8 @@ public class CanalConstants {
     public static final String CANAL_MQ_CANALBATCHSIZE           = ROOT + "." + "mq.canalBatchSize";
     public static final String CANAL_MQ_CANALGETTIMEOUT          = ROOT + "." + "mq.canalGetTimeout";
     public static final String CANAL_MQ_FLATMESSAGE              = ROOT + "." + "mq.flatMessage";
+    public static final String CANAL_MQ_COMPRESSION_TYPE         = ROOT + "." + "mq.compressionType";
+    public static final String CANAL_MQ_ACKS                     = ROOT + "." + "mq.acks";
 
     public static String getInstanceModeKey(String destination) {
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);

+ 2 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -100,6 +100,8 @@ public class CanalLauncher {
             CanalConstants.CANAL_MQ_CANALGETTIMEOUT)));
         mqProperties.setFlatMessage(Boolean.valueOf(CanalController.getProperty(properties,
             CanalConstants.CANAL_MQ_FLATMESSAGE)));
+        mqProperties.setCompressionType(CanalController.getProperty(properties,CanalConstants.CANAL_MQ_COMPRESSION_TYPE));
+        mqProperties.setAcks(CanalController.getProperty(properties,CanalConstants.CANAL_MQ_ACKS));
         return mqProperties;
     }
 

+ 19 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -20,6 +20,8 @@ public class MQProperties {
     private int     canalBatchSize         = 50;
     private Long    canalGetTimeout        = 100L;
     private boolean flatMessage            = true;
+    private String compressionType         = "none";
+    private String acks                    = "all";
 
     public static class CanalDestination {
 
@@ -149,4 +151,21 @@ public class MQProperties {
     public void setProducerGroup(String producerGroup) {
         this.producerGroup = producerGroup;
     }
+
+    public String getCompressionType() {
+        return compressionType;
+    }
+
+    public void setCompressionType(String compressionType) {
+        this.compressionType = compressionType;
+    }
+
+    public String getAcks() {
+        return acks;
+    }
+
+    public void setAcks(String acks) {
+        this.acks = acks;
+    }
+
 }

+ 2 - 1
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -37,7 +37,8 @@ public class CanalKafkaProducer implements CanalMQProducer {
         this.kafkaProperties = kafkaProperties;
         Properties properties = new Properties();
         properties.put("bootstrap.servers", kafkaProperties.getServers());
-        properties.put("acks", "all");
+        properties.put("acks", kafkaProperties.getAcks());
+        properties.put("compression.type",kafkaProperties.getCompressionType());
         properties.put("retries", kafkaProperties.getRetries());
         properties.put("batch.size", kafkaProperties.getBatchSize());
         properties.put("linger.ms", kafkaProperties.getLingerMs());