Przeglądaj źródła

add compressionType,enableChunking for pulsar client , upgrade dependencies for security (#5191)

* add support for listener name which is described in https://pulsar.apache.org/docs/next/concepts-multiple-advertised-listeners/

* upgrade for CVE-2023-20883  CVE-2022-1471  CVE-2023-20860

* add   enableChunking

* downgrade  pulsar client version

* fix

* 增加压缩算法
JZ 9 miesięcy temu
rodzic
commit
fb3d2c6811

+ 7 - 2
admin/pom.xml

@@ -36,7 +36,7 @@
             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-dependencies</artifactId>
-                <version>2.5.4</version>
+                <version>2.5.15</version>
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
@@ -48,7 +48,12 @@
             <dependency>
                 <groupId>io.ebean</groupId>
                 <artifactId>ebean</artifactId>
-                <version>11.41.1</version>
+                <version>11.45.1</version>
+            </dependency>
+            <dependency>
+                <groupId>org.yaml</groupId>
+                <artifactId>snakeyaml</artifactId>
+                <version>2.0</version>
             </dependency>
         </dependencies>
     </dependencyManagement>

+ 1 - 1
client-adapter/launcher/pom.xml

@@ -17,7 +17,7 @@
             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-dependencies</artifactId>
-                <version>2.5.4</version>
+                <version>2.5.15</version>
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>

+ 2 - 2
client-adapter/pom.xml

@@ -117,7 +117,7 @@
             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot</artifactId>
-                <version>2.5.4</version>
+                <version>2.5.15</version>
             </dependency>
             <dependency>
                 <groupId>com.h2database</groupId>
@@ -238,7 +238,7 @@
             <dependency>
                 <groupId>org.yaml</groupId>
                 <artifactId>snakeyaml</artifactId>
-                <version>1.29</version>
+                <version>2.0</version>
             </dependency>
             <!-- test -->
             <dependency>

+ 11 - 1
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java

@@ -62,8 +62,18 @@ public class PulsarMQConstants {
     public static final String PULSARMQ_ADMIN_SERVER_URL          = ROOT + "." + "adminServerUrl";
 
     /**
-     * Pulsar admin服务器地址
+     * Pulsar 监听器名字
      */
     public static final String PULSARMQ_LISTENER_NAME             = ROOT + "." + "listenerName";
 
+
+    /**
+     * Pulsar 开启chunking
+     */
+    public static final String PULSARMQ_ENABLE_CHUNKING             = ROOT + "." + "enableChunking";
+
+    /**
+     * Pulsar 压缩算法
+     */
+    public static final String PULSARMQ_COMPRESSION_TYPE             = ROOT + "." + "compressionType";
 }

+ 24 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java

@@ -42,6 +42,16 @@ public class PulsarMQProducerConfig extends MQProperties {
      */
     private String listenerName;
 
+    /**
+     * enableChunking
+     */
+    private boolean enableChunking;
+
+    /**
+     * compressionType
+     */
+    private String compressionType;
+
     public String getServerUrl() {
         return serverUrl;
     }
@@ -81,4 +91,18 @@ public class PulsarMQProducerConfig extends MQProperties {
     public void setListenerName(String listenerName) {
         this.listenerName = listenerName;
     }
+
+    public void setEnableChunking(boolean enableChunking) {
+        this.enableChunking = enableChunking;
+    }
+    public boolean getEnableChunking() {
+        return this.enableChunking;
+    }
+
+    public void setCompressionType(String compressionType) {
+        this.compressionType = compressionType;
+    }
+    public String getCompressionType() {
+        return this.compressionType;
+    }
 }

+ 36 - 2
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java

@@ -137,6 +137,17 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
             tmpProperties.setListenerName(listenerName);
         }
 
+        String enableChunkingStr = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ENABLE_CHUNKING);
+        if (!StringUtils.isEmpty(enableChunkingStr)) {
+            tmpProperties.setEnableChunking(Boolean.parseBoolean(enableChunkingStr));
+        }
+
+        String compressionType = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_COMPRESSION_TYPE);
+        if (!StringUtils.isEmpty(compressionType)) {
+            tmpProperties.setCompressionType(compressionType);
+        }
+
+
         if (logger.isDebugEnabled()) {
             logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
         }
@@ -408,11 +419,34 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
                     }
 
                     // 创建指定topic的生产者
-                    producer = client.newProducer()
-                        .topic(fullTopic)
+                    ProducerBuilder producerBuilder = client.newProducer();
+                    if(pulsarMQProperties.getEnableChunking()){
+                        producerBuilder.enableChunking(true);
+                        producerBuilder.enableBatching(false);
+                    }
+
+                    if(!StringUtils.isEmpty(pulsarMQProperties.getCompressionType())) {
+                        switch(pulsarMQProperties.getCompressionType().toLowerCase()) {
+                            case "lz4":
+                                producerBuilder.compressionType(CompressionType.LZ4);
+                                break;
+                            case "zlib":
+                                producerBuilder.compressionType(CompressionType.ZLIB);
+                                break;
+                            case "zstd":
+                                producerBuilder.compressionType(CompressionType.ZSTD);
+                                break;
+                            case "snappy":
+                                producerBuilder.compressionType(CompressionType.SNAPPY);
+                                break;
+                        }
+                    }
+
+                    producer = producerBuilder.topic(fullTopic)
                         // 指定路由器
                         .messageRouter(new MessageRouterImpl(topic))
                         .create();
+
                     // 放入缓存
                     PRODUCERS.put(topic, producer);
                 }

+ 3 - 3
pom.xml

@@ -99,13 +99,13 @@
         <java_target_version>1.8</java_target_version>
         <file_encoding>UTF-8</file_encoding>
         <javadoc_skip>true</javadoc_skip>
-        <spring_version>5.3.9</spring_version>
+        <spring_version>5.3.26</spring_version>
         <log4j_version>2.17.0</log4j_version>
-        <rocketmq_version>4.8.0</rocketmq_version>
+        <rocketmq_version>4.9.8</rocketmq_version>
         <rabbitmq_version>5.18.0</rabbitmq_version>
         <mq_amqp_client>1.0.3</mq_amqp_client>
         <kafka_version>2.4.0</kafka_version>
-        <pulsar_version>2.8.1</pulsar_version>
+        <pulsar_version>2.11.4</pulsar_version>
         <mysql_driver_version>5.1.48</mysql_driver_version>
         <maven-jacoco-plugin.version>0.8.3</maven-jacoco-plugin.version>
         <maven-surefire.version>2.22.1</maven-surefire.version>