Browse Source

fixed issue #3829 , kafka connector support k8s env

jianghang.loujh 3 years ago
parent
commit
f26b84ffc2

+ 3 - 0
connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/consumer/CanalKafkaConsumer.java

@@ -9,6 +9,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -53,6 +54,8 @@ public class CanalKafkaConsumer implements CanalMsgConsumer {
             String k = (String) entry.getKey();
             Object v = entry.getValue();
             if (k.startsWith(PREFIX_KAFKA_CONFIG) && v != null) {
+                // check env config
+                v = PropertiesUtils.getProperty(properties, k);
                 kafkaProperties.put(k.substring(PREFIX_KAFKA_CONFIG.length()), v);
             }
         }

+ 2 - 0
connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java

@@ -102,6 +102,8 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
             String key = (String) entry.getKey();
             Object value = entry.getValue();
             if (key.startsWith(PREFIX_KAFKA_CONFIG) && value != null) {
+                // check env config
+                value = PropertiesUtils.getProperty(properties, key);
                 key = key.substring(PREFIX_KAFKA_CONFIG.length());
                 kafkaProperties.put(key, value);
             }