瀏覽代碼

fixed pull #5191 , reformat

jianghang.loujh 9 月之前
父節點
當前提交
fea4d8210a

+ 2 - 3
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java

@@ -66,14 +66,13 @@ public class PulsarMQConstants {
      */
     public static final String PULSARMQ_LISTENER_NAME             = ROOT + "." + "listenerName";
 
-
     /**
      * Pulsar 开启chunking
      */
-    public static final String PULSARMQ_ENABLE_CHUNKING             = ROOT + "." + "enableChunking";
+    public static final String PULSARMQ_ENABLE_CHUNKING           = ROOT + "." + "enableChunking";
 
     /**
      * Pulsar 压缩算法
      */
-    public static final String PULSARMQ_COMPRESSION_TYPE             = ROOT + "." + "compressionType";
+    public static final String PULSARMQ_COMPRESSION_TYPE          = ROOT + "." + "compressionType";
 }

+ 8 - 6
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java

@@ -18,7 +18,7 @@ public class PulsarMQProducerConfig extends MQProperties {
      * httpUrl: http://localhost:8080
      * </p>
      */
-    private String serverUrl;
+    private String  serverUrl;
     /**
      * pulsar topic前缀
      * <p>
@@ -27,20 +27,20 @@ public class PulsarMQProducerConfig extends MQProperties {
      * 在发送消息时会自动拼接上
      * </p>
      */
-    private String topicTenantPrefix;
+    private String  topicTenantPrefix;
     /**
      * 生产者角色权限,请确保该角色有canal使用的所有topic生产者权限(最低要求)
      */
-    private String roleToken;
+    private String  roleToken;
     /**
      * admin服务器地址
      */
-    private String adminServerUrl;
+    private String  adminServerUrl;
 
     /**
      * listener name
      */
-    private String listenerName;
+    private String  listenerName;
 
     /**
      * enableChunking
@@ -50,7 +50,7 @@ public class PulsarMQProducerConfig extends MQProperties {
     /**
      * compressionType
      */
-    private String compressionType;
+    private String  compressionType;
 
     public String getServerUrl() {
         return serverUrl;
@@ -95,6 +95,7 @@ public class PulsarMQProducerConfig extends MQProperties {
     public void setEnableChunking(boolean enableChunking) {
         this.enableChunking = enableChunking;
     }
+
     public boolean getEnableChunking() {
         return this.enableChunking;
     }
@@ -102,6 +103,7 @@ public class PulsarMQProducerConfig extends MQProperties {
     public void setCompressionType(String compressionType) {
         this.compressionType = compressionType;
     }
+
     public String getCompressionType() {
         return this.compressionType;
     }

+ 3 - 4
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java

@@ -147,7 +147,6 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
             tmpProperties.setCompressionType(compressionType);
         }
 
-
         if (logger.isDebugEnabled()) {
             logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
         }
@@ -420,13 +419,13 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
 
                     // 创建指定topic的生产者
                     ProducerBuilder producerBuilder = client.newProducer();
-                    if(pulsarMQProperties.getEnableChunking()){
+                    if (pulsarMQProperties.getEnableChunking()) {
                         producerBuilder.enableChunking(true);
                         producerBuilder.enableBatching(false);
                     }
 
-                    if(!StringUtils.isEmpty(pulsarMQProperties.getCompressionType())) {
-                        switch(pulsarMQProperties.getCompressionType().toLowerCase()) {
+                    if (!StringUtils.isEmpty(pulsarMQProperties.getCompressionType())) {
+                        switch (pulsarMQProperties.getCompressionType().toLowerCase()) {
                             case "lz4":
                                 producerBuilder.compressionType(CompressionType.LZ4);
                                 break;