Browse Source

当MQ配置为Kafka时,支持配置Kerberos认证 (#1895)

BillowX 6 years ago
parent
commit
74489d1bba

+ 44 - 43
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -10,55 +10,56 @@ import java.text.MessageFormat;
  */
 public class CanalConstants {
 
-    public static final String MDC_DESTINATION                   = "destination";
-    public static final String ROOT                              = "canal";
-    public static final String CANAL_ID                          = ROOT + "." + "id";
-    public static final String CANAL_IP                          = ROOT + "." + "ip";
-    public static final String CANAL_PORT                        = ROOT + "." + "port";
-    public static final String CANAL_METRICS_PULL_PORT           = ROOT + "." + "metrics.pull.port";
-    public static final String CANAL_ZKSERVERS                   = ROOT + "." + "zkServers";
-    public static final String CANAL_WITHOUT_NETTY               = ROOT + "." + "withoutNetty";
+    public static final String MDC_DESTINATION                       = "destination";
+    public static final String ROOT                                  = "canal";
+    public static final String CANAL_ID                              = ROOT + "." + "id";
+    public static final String CANAL_IP                              = ROOT + "." + "ip";
+    public static final String CANAL_PORT                            = ROOT + "." + "port";
+    public static final String CANAL_METRICS_PULL_PORT               = ROOT + "." + "metrics.pull.port";
+    public static final String CANAL_ZKSERVERS                       = ROOT + "." + "zkServers";
+    public static final String CANAL_WITHOUT_NETTY                   = ROOT + "." + "withoutNetty";
 
-    public static final String CANAL_DESTINATIONS                = ROOT + "." + "destinations";
-    public static final String CANAL_AUTO_SCAN                   = ROOT + "." + "auto.scan";
-    public static final String CANAL_AUTO_SCAN_INTERVAL          = ROOT + "." + "auto.scan.interval";
-    public static final String CANAL_CONF_DIR                    = ROOT + "." + "conf.dir";
-    public static final String CANAL_SERVER_MODE                 = ROOT + "." + "serverMode";
+    public static final String CANAL_DESTINATIONS                    = ROOT + "." + "destinations";
+    public static final String CANAL_AUTO_SCAN                       = ROOT + "." + "auto.scan";
+    public static final String CANAL_AUTO_SCAN_INTERVAL              = ROOT + "." + "auto.scan.interval";
+    public static final String CANAL_CONF_DIR                        = ROOT + "." + "conf.dir";
+    public static final String CANAL_SERVER_MODE                     = ROOT + "." + "serverMode";
 
-    public static final String CANAL_DESTINATION_SPLIT           = ",";
-    public static final String GLOBAL_NAME                       = "global";
+    public static final String CANAL_DESTINATION_SPLIT               = ",";
+    public static final String GLOBAL_NAME                           = "global";
 
-    public static final String INSTANCE_MODE_TEMPLATE            = ROOT + "." + "instance.{0}.mode";
-    public static final String INSTANCE_LAZY_TEMPLATE            = ROOT + "." + "instance.{0}.lazy";
-    public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE = ROOT + "." + "instance.{0}.manager.address";
-    public static final String INSTANCE_SPRING_XML_TEMPLATE      = ROOT + "." + "instance.{0}.spring.xml";
+    public static final String INSTANCE_MODE_TEMPLATE                = ROOT + "." + "instance.{0}.mode";
+    public static final String INSTANCE_LAZY_TEMPLATE                = ROOT + "." + "instance.{0}.lazy";
+    public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE     = ROOT + "." + "instance.{0}.manager.address";
+    public static final String INSTANCE_SPRING_XML_TEMPLATE          = ROOT + "." + "instance.{0}.spring.xml";
 
-    public static final String CANAL_DESTINATION_PROPERTY        = ROOT + ".instance.destination";
-
-    public static final String CANAL_SOCKETCHANNEL               = ROOT + "." + "socketChannel";
-
-    public static final String CANAL_MQ_SERVERS                  = ROOT + "." + "mq.servers";
-    public static final String CANAL_MQ_RETRIES                  = ROOT + "." + "mq.retries";
-    public static final String CANAL_MQ_BATCHSIZE                = ROOT + "." + "mq.batchSize";
-    public static final String CANAL_MQ_LINGERMS                 = ROOT + "." + "mq.lingerMs";
-    public static final String CANAL_MQ_MAXREQUESTSIZE           = ROOT + "." + "mq.maxRequestSize";
-    public static final String CANAL_MQ_BUFFERMEMORY             = ROOT + "." + "mq.bufferMemory";
-    public static final String CANAL_MQ_CANALBATCHSIZE           = ROOT + "." + "mq.canalBatchSize";
-    public static final String CANAL_MQ_CANALGETTIMEOUT          = ROOT + "." + "mq.canalGetTimeout";
-    public static final String CANAL_MQ_FLATMESSAGE              = ROOT + "." + "mq.flatMessage";
-    public static final String CANAL_MQ_COMPRESSION_TYPE         = ROOT + "." + "mq.compressionType";
-    public static final String CANAL_MQ_ACKS                     = ROOT + "." + "mq.acks";
-    public static final String CANAL_MQ_TRANSACTION              = ROOT + "." + "mq.transaction";
-    public static final String CANAL_MQ_PRODUCERGROUP            = ROOT + "." + "mq.producerGroup";
-    public static final String CANAL_ALIYUN_ACCESSKEY            = ROOT + "." + "aliyun.accessKey";
-    public static final String CANAL_ALIYUN_SECRETKEY            = ROOT + "." + "aliyun.secretKey";
-    public static final String CANAL_MQ_PROPERTIES               = ROOT + "." + "mq.properties";
-    public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE     = ROOT + "." + "mq.enableMessageTrace";
-    public static final String CANAL_MQ_ACCESS_CHANNEL           = ROOT + "." + "mq.accessChannel";
-    public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC   = ROOT + "." + "mq.customizedTraceTopic";
-    public static final String CANAL_MQ_NAMESPACE                = ROOT + "." + "mq.namespace";
+    public static final String CANAL_DESTINATION_PROPERTY            = ROOT + ".instance.destination";
 
+    public static final String CANAL_SOCKETCHANNEL                   = ROOT + "." + "socketChannel";
 
+    public static final String CANAL_MQ_SERVERS                      = ROOT + "." + "mq.servers";
+    public static final String CANAL_MQ_RETRIES                      = ROOT + "." + "mq.retries";
+    public static final String CANAL_MQ_BATCHSIZE                    = ROOT + "." + "mq.batchSize";
+    public static final String CANAL_MQ_LINGERMS                     = ROOT + "." + "mq.lingerMs";
+    public static final String CANAL_MQ_MAXREQUESTSIZE               = ROOT + "." + "mq.maxRequestSize";
+    public static final String CANAL_MQ_BUFFERMEMORY                 = ROOT + "." + "mq.bufferMemory";
+    public static final String CANAL_MQ_CANALBATCHSIZE               = ROOT + "." + "mq.canalBatchSize";
+    public static final String CANAL_MQ_CANALGETTIMEOUT              = ROOT + "." + "mq.canalGetTimeout";
+    public static final String CANAL_MQ_FLATMESSAGE                  = ROOT + "." + "mq.flatMessage";
+    public static final String CANAL_MQ_COMPRESSION_TYPE             = ROOT + "." + "mq.compressionType";
+    public static final String CANAL_MQ_ACKS                         = ROOT + "." + "mq.acks";
+    public static final String CANAL_MQ_TRANSACTION                  = ROOT + "." + "mq.transaction";
+    public static final String CANAL_MQ_PRODUCERGROUP                = ROOT + "." + "mq.producerGroup";
+    public static final String CANAL_ALIYUN_ACCESSKEY                = ROOT + "." + "aliyun.accessKey";
+    public static final String CANAL_ALIYUN_SECRETKEY                = ROOT + "." + "aliyun.secretKey";
+    public static final String CANAL_MQ_PROPERTIES                   = ROOT + "." + "mq.properties";
+    public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE         = ROOT + "." + "mq.enableMessageTrace";
+    public static final String CANAL_MQ_ACCESS_CHANNEL               = ROOT + "." + "mq.accessChannel";
+    public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC       = ROOT + "." + "mq.customizedTraceTopic";
+    public static final String CANAL_MQ_NAMESPACE                    = ROOT + "." + "mq.namespace";
+    public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE        = ROOT + "." + "mq.kafka.kerberos.enable";
+    public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH  = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
+    public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH  = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
 
     public static String getInstanceModeKey(String destination) {
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);

+ 24 - 10
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -1,15 +1,5 @@
 package com.alibaba.otter.canal.deployer;
 
-import java.io.File;
-import java.io.FileFilter;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
 import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
@@ -18,6 +8,15 @@ import com.alibaba.otter.canal.spi.CanalMQProducer;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
 
 /**
  * Canal server 启动类
@@ -227,6 +226,21 @@ public class CanalStater {
             mqProperties.setNamespace(namespace);
         }
 
+        String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
+        if (!StringUtils.isEmpty(kafkaKerberosEnable)) {
+            mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable));
+        }
+
+        String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
+        if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) {
+            mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath);
+        }
+
+        String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
+        if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) {
+            mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
+        }
+
         for (Object key : properties.keySet()) {
             key = StringUtils.trim(key.toString());
             if (((String) key).startsWith(CanalConstants.CANAL_MQ_PROPERTIES)) {

+ 7 - 0
deployer/src/main/resources/canal.properties

@@ -124,3 +124,10 @@ canal.mq.accessChannel = local
 # aliyun mq namespace
 #canal.mq.namespace =
 
+##################################################
+#########     Kafka Kerberos Info    #############
+##################################################
+canal.mq.kafka.kerberos.enable = false
+canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
+canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
+

+ 31 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -31,6 +31,10 @@ public class MQProperties {
     private String     accessChannel          = null;
     private String     customizedTraceTopic   = null;
     private String     namespace              = "";
+    private boolean    kerberosEnable         = false;           //kafka集群是否启动Kerberos认证
+    private String     kerberosKrb5FilePath   = "";              //启动Kerberos认证时配置为krb5.conf文件的路径
+    private String     kerberosJaasFilePath   = "";              //启动Kerberos认证时配置为jaas.conf文件的路径
+
     public static class CanalDestination {
 
         private String  canalDestination;
@@ -257,6 +261,30 @@ public class MQProperties {
         this.namespace = namespace;
     }
 
+    public boolean isKerberosEnable() {
+        return kerberosEnable;
+    }
+
+    public void setKerberosEnable(boolean kerberosEnable) {
+        this.kerberosEnable = kerberosEnable;
+    }
+
+    public String getKerberosKrb5FilePath() {
+        return kerberosKrb5FilePath;
+    }
+
+    public void setKerberosKrb5FilePath(String kerberosKrb5FilePath) {
+        this.kerberosKrb5FilePath = kerberosKrb5FilePath;
+    }
+
+    public String getKerberosJaasFilePath() {
+        return kerberosJaasFilePath;
+    }
+
+    public void setKerberosJaasFilePath(String kerberosJaasFilePath) {
+        this.kerberosJaasFilePath = kerberosJaasFilePath;
+    }
+
     @Override public String toString() {
         return "MQProperties{" +
             "servers='" + servers + '\'' +
@@ -280,6 +308,9 @@ public class MQProperties {
             ", accessChannel='" + accessChannel + '\'' +
             ", customizedTraceTopic='" + customizedTraceTopic + '\'' +
             ", namespace='" + namespace + '\'' +
+            ", kerberosEnable='" + kerberosEnable + '\'' +
+            ", kerberosKrb5FilePath='" + kerberosKrb5FilePath + '\'' +
+            ", kerberosJaasFilePath='" + kerberosJaasFilePath + '\'' +
             '}';
     }
 }

+ 33 - 13
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,11 +1,12 @@
 package com.alibaba.otter.canal.kafka;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.MQMessageUtils;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -14,13 +15,12 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.MQMessageUtils;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 
 /**
  * kafka producer 主操作类
@@ -59,6 +59,26 @@ public class CanalKafkaProducer implements CanalMQProducer {
         } else {
             properties.put("retries", kafkaProperties.getRetries());
         }
+
+        if (kafkaProperties.isKerberosEnable()){
+            File krb5File = new File(kafkaProperties.getKerberosKrb5FilePath());
+            File jaasFile = new File(kafkaProperties.getKerberosJaasFilePath());
+            if(krb5File.exists() && jaasFile.exists()){
+                //配置kerberos认证,需要使用绝对路径
+                System.setProperty("java.security.krb5.conf",
+                        krb5File.getAbsolutePath());
+                System.setProperty("java.security.auth.login.config",
+                        jaasFile.getAbsolutePath());
+                System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
+                properties.put("security.protocol", "SASL_PLAINTEXT");
+                properties.put("sasl.kerberos.service.name", "kafka");
+            }else{
+                String errorMsg = "ERROR # The kafka kerberos configuration file does not exist! please check it";
+                logger.error(errorMsg);
+                throw new RuntimeException(errorMsg);
+            }
+        }
+
         if (!kafkaProperties.getFlatMessage()) {
             properties.put("value.serializer", MessageSerializer.class.getName());
             producer = new KafkaProducer<String, Message>(properties);