浏览代码

Feat, support SSL(TLS), improve SHA2 password. (#5147)

* feat #4489, mysql SSL

* feat #4489, mysql SHA2 and SSL

* merge master, fix conflict.

* improve SHA2 auth plugin, fix SSL status print.

* SSL key store, use url rather than path, keep same as JDBC driver.
dingxiaobo 10 月之前
父节点
当前提交
5c61f2fb56
共有 28 个文件被更改,包括 956 次插入112 次删除
  1. 24 1
      deployer/src/main/resources/spring/default-instance.xml
  2. 0 1
      deployer/src/main/resources/spring/file-instance.xml
  3. 0 2
      deployer/src/main/resources/spring/group-instance.xml
  4. 24 1
      deployer/src/main/resources/spring/memory-instance.xml
  5. 122 60
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java
  6. 65 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/ClientAuthenticationSHA2Packet.java
  7. 41 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/SslRequestCommandPacket.java
  8. 10 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java
  9. 304 2
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannelPool.java
  10. 34 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java
  11. 98 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/ssl/SslInfo.java
  12. 26 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/ssl/SslMode.java
  13. 7 6
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java
  14. 163 17
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java
  15. 0 1
      instance/spring/src/test/resources/spring/default-instance.xml
  16. 0 1
      instance/spring/src/test/resources/spring/file-instance.xml
  17. 0 2
      instance/spring/src/test/resources/spring/group-instance.xml
  18. 0 1
      instance/spring/src/test/resources/spring/memory-instance.xml
  19. 0 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  20. 0 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java
  21. 18 3
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  22. 2 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  23. 0 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java
  24. 18 1
      parse/src/main/java/com/alibaba/otter/canal/parse/support/AuthenticationInfo.java
  25. 0 1
      server/src/test/java/com/alibaba/otter/canal/server/CanalServerTest.java
  26. 0 1
      server/src/test/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded_FileModeTest.java
  27. 0 1
      server/src/test/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded_StandaloneTest.java
  28. 0 1
      server/src/test/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded_StandbyTest.java

+ 24 - 1
deployer/src/main/resources/spring/default-instance.xml

@@ -111,7 +111,6 @@
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
 
 		<!-- 解析编码 -->
-		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
 
 		<!-- 解析位点记录 -->
@@ -140,6 +139,18 @@
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
+				<property name="sslInfo">
+					<bean class="com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo">
+						<property name="sslMode" value="${canal.instance.master.sslMode:DISABLED}"/>
+						<property name="tlsVersions" value="${canal.instance.master.tlsVersions:}"/>
+						<property name="trustCertificateKeyStoreType" value="${canal.instance.master.trustCertificateKeyStoreType:}"/>
+						<property name="trustCertificateKeyStoreUrl" value="${canal.instance.master.trustCertificateKeyStoreUrl:}"/>
+						<property name="trustCertificateKeyStorePassword" value="${canal.instance.master.trustCertificateKeyStorePassword:}"/>
+						<property name="clientCertificateKeyStoreType" value="${canal.instance.master.clientCertificateKeyStoreType:}"/>
+						<property name="clientCertificateKeyStoreUrl" value="${canal.instance.master.clientCertificateKeyStoreUrl:}"/>
+						<property name="clientCertificateKeyStorePassword" value="${canal.instance.master.clientCertificateKeyStorePassword:}"/>
+					</bean>
+				</property>
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -150,6 +161,18 @@
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
+				<property name="sslInfo">
+					<bean class="com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo">
+						<property name="sslMode" value="${canal.instance.standby.sslMode:DISABLED}"/>
+						<property name="tlsVersions" value="${canal.instance.standby.tlsVersions:}"/>
+						<property name="trustCertificateKeyStoreType" value="${canal.instance.standby.trustCertificateKeyStoreType:}"/>
+						<property name="trustCertificateKeyStoreUrl" value="${canal.instance.standby.trustCertificateKeyStoreUrl:}"/>
+						<property name="trustCertificateKeyStorePassword" value="${canal.instance.standby.trustCertificateKeyStorePassword:}"/>
+						<property name="clientCertificateKeyStoreType" value="${canal.instance.standby.clientCertificateKeyStoreType:}"/>
+						<property name="clientCertificateKeyStoreUrl" value="${canal.instance.standby.clientCertificateKeyStoreUrl:}"/>
+						<property name="clientCertificateKeyStorePassword" value="${canal.instance.standby.clientCertificateKeyStorePassword:}"/>
+					</bean>
+				</property>
 			</bean>
 		</property>
 

+ 0 - 1
deployer/src/main/resources/spring/file-instance.xml

@@ -97,7 +97,6 @@
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
 
 		<!-- 解析编码 -->
-		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
 
 		<!-- 解析位点记录 -->

+ 0 - 2
deployer/src/main/resources/spring/group-instance.xml

@@ -103,7 +103,6 @@
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
 
 		<!-- 解析编码 -->
-		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
 
 		<!-- 解析位点记录 -->
@@ -211,7 +210,6 @@
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
 
 		<!-- 解析编码 -->
-		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
 
 		<!-- 解析位点记录 -->

+ 24 - 1
deployer/src/main/resources/spring/memory-instance.xml

@@ -94,7 +94,6 @@
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
 
 		<!-- 解析编码 -->
-		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
 
 		<!-- 解析位点记录 -->
@@ -114,6 +113,18 @@
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
+				<property name="sslInfo">
+					<bean class="com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo">
+						<property name="sslMode" value="${canal.instance.master.sslMode:DISABLED}"/>
+						<property name="tlsVersions" value="${canal.instance.master.tlsVersions:}"/>
+						<property name="trustCertificateKeyStoreType" value="${canal.instance.master.trustCertificateKeyStoreType:}"/>
+						<property name="trustCertificateKeyStoreUrl" value="${canal.instance.master.trustCertificateKeyStoreUrl:}"/>
+						<property name="trustCertificateKeyStorePassword" value="${canal.instance.master.trustCertificateKeyStorePassword:}"/>
+						<property name="clientCertificateKeyStoreType" value="${canal.instance.master.clientCertificateKeyStoreType:}"/>
+						<property name="clientCertificateKeyStoreUrl" value="${canal.instance.master.clientCertificateKeyStoreUrl:}"/>
+						<property name="clientCertificateKeyStorePassword" value="${canal.instance.master.clientCertificateKeyStorePassword:}"/>
+					</bean>
+				</property>
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -124,6 +135,18 @@
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
+				<property name="sslInfo">
+					<bean class="com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo">
+						<property name="sslMode" value="${canal.instance.standby.sslMode:DISABLED}"/>
+						<property name="tlsVersions" value="${canal.instance.standby.tlsVersions:}"/>
+						<property name="trustCertificateKeyStoreType" value="${canal.instance.standby.trustCertificateKeyStoreType:}"/>
+						<property name="trustCertificateKeyStoreUrl" value="${canal.instance.standby.trustCertificateKeyStoreUrl:}"/>
+						<property name="trustCertificateKeyStorePassword" value="${canal.instance.standby.trustCertificateKeyStorePassword:}"/>
+						<property name="clientCertificateKeyStoreType" value="${canal.instance.standby.clientCertificateKeyStoreType:}"/>
+						<property name="clientCertificateKeyStoreUrl" value="${canal.instance.standby.clientCertificateKeyStoreUrl:}"/>
+						<property name="clientCertificateKeyStorePassword" value="${canal.instance.standby.clientCertificateKeyStorePassword:}"/>
+					</bean>
+				</property>
 			</bean>
 		</property>
 

+ 122 - 60
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -12,13 +12,18 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.AuthSwitchResponsePacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.ClientAuthenticationPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.ClientAuthenticationSHA2Packet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.QuitCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.SslRequestCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.*;
 import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannelPool;
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo;
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslMode;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.MSC;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.MySQLPasswordEncrypter;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
+import static com.alibaba.otter.canal.parse.driver.mysql.packets.Capability.CLIENT_SSL;
 
 /**
  * 基于mysql socket协议的链接实现
@@ -32,8 +37,8 @@ public class MysqlConnector {
     private InetSocketAddress   address;
     private String              username;
     private String              password;
+    private SslInfo  sslInfo;
 
-    private byte                charsetNumber     = 33;
     private String              defaultSchema;
     private int                 soTimeout         = 30 * 1000;
     private int                 connTimeout       = 5 * 1000;
@@ -62,20 +67,26 @@ public class MysqlConnector {
         this.password = password;
     }
 
-    public MysqlConnector(InetSocketAddress address, String username, String password, byte charsetNumber,
+    public MysqlConnector(InetSocketAddress address, String username, String password,
                           String defaultSchema){
         this(address, username, password);
 
-        this.charsetNumber = charsetNumber;
         this.defaultSchema = defaultSchema;
     }
 
+    public MysqlConnector(InetSocketAddress address, String username, String password,
+        String defaultSchema, SslInfo sslInfo) {
+        this(address, username, password, defaultSchema);
+        this.sslInfo = sslInfo;
+    }
+
     public void connect() throws IOException {
         if (connected.compareAndSet(false, true)) {
             try {
                 channel = SocketChannelPool.open(address);
                 logger.info("connect MysqlConnection to {}...", address);
                 negotiate(channel);
+                printSslStatus();
             } catch (Exception e) {
                 disconnect();
                 throw new IOException("connect " + this.address + " failure", e);
@@ -85,6 +96,27 @@ public class MysqlConnector {
         }
     }
 
+    private void printSslStatus() {
+        try {
+            MysqlQueryExecutor executor = new MysqlQueryExecutor(this);
+            ResultSetPacket result = executor.query("SHOW STATUS LIKE 'Ssl_version'");
+            String sslVersion = "";
+            if (result.getFieldValues() != null && result.getFieldValues().size() >= 2) {
+                sslVersion = result.getFieldValues().get(1);
+            }
+            result = executor.query("SHOW STATUS LIKE 'Ssl_cipher'");
+            String sslCipher = "";
+            if (result.getFieldValues() != null && result.getFieldValues().size() >= 2) {
+                sslCipher = result.getFieldValues().get(1);
+            }
+            logger.info("connect MysqlConnection in sslMode {}, Ssl_version:{}, Ssl_cipher:{}",
+                (sslInfo != null ? sslInfo.getSslMode() : SslMode.DISABLED), sslVersion, sslCipher);
+        } catch (Exception e) {
+            logger.info("Can't show SSL status, server may not standard MySQL server: {}", e.toString());
+            logger.debug("show SSL status exception", e);
+        }
+    }
+
     public void reconnect() throws IOException {
         disconnect();
         connect();
@@ -131,7 +163,6 @@ public class MysqlConnector {
 
     public MysqlConnector fork() {
         MysqlConnector connector = new MysqlConnector();
-        connector.setCharsetNumber(getCharsetNumber());
         connector.setDefaultSchema(getDefaultSchema());
         connector.setAddress(getAddress());
         connector.setPassword(password);
@@ -140,6 +171,7 @@ public class MysqlConnector {
         connector.setSendBufferSize(getSendBufferSize());
         connector.setSoTimeout(getSoTimeout());
         connector.setConnTimeout(connTimeout);
+        connector.setSslInfo(getSslInfo());
         return connector;
     }
 
@@ -170,29 +202,53 @@ public class MysqlConnector {
         }
         HandshakeInitializationPacket handshakePacket = new HandshakeInitializationPacket();
         handshakePacket.fromBytes(body);
+        SslMode sslMode = sslInfo != null ? sslInfo.getSslMode() : SslMode.DISABLED;
+        if (sslMode != SslMode.DISABLED) {
+            boolean serverSupportSsl = (handshakePacket.serverCapabilities & CLIENT_SSL) > 0;
+            if (!serverSupportSsl) {
+                throw new IOException("MySQL Server does not support SSL: " + address
+                    + " serverCapabilities: " + handshakePacket.serverCapabilities);
+            }
+            byte[] sslPacket = new SslRequestCommandPacket(handshakePacket.serverCharsetNumber).toBytes();
+            HeaderPacket sslHeader = new HeaderPacket();
+            sslHeader.setPacketBodyLength(sslPacket.length);
+            sslHeader.setPacketSequenceNumber((byte)(header.getPacketSequenceNumber() + 1));
+            header.setPacketSequenceNumber((byte)(header.getPacketSequenceNumber() + 1));
+            PacketManager.writePkg(channel, sslHeader.toBytes(), sslPacket);
+            channel = SocketChannelPool.connectSsl(channel, sslInfo);
+            this.channel = channel;
+        }
         if (handshakePacket.protocolVersion != MSC.DEFAULT_PROTOCOL_VERSION) {
             // HandshakeV9
-            auth323(channel, (byte) (header.getPacketSequenceNumber() + 1), handshakePacket.seed);
+            auth323(channel, (byte)(header.getPacketSequenceNumber() + 1), handshakePacket.seed);
             return;
         }
 
         connectionId = handshakePacket.threadId; // 记录一下connection
         serverVersion = handshakePacket.serverVersion; // 记录serverVersion
         logger.info("handshake initialization packet received, prepare the client authentication packet to send");
-        ClientAuthenticationPacket clientAuth = new ClientAuthenticationPacket();
-        clientAuth.setCharsetNumber(charsetNumber);
+        logger.info("auth plugin: {}", new String(handshakePacket.authPluginName));
+        boolean isSha2Password = false;
+        ClientAuthenticationPacket clientAuth;
+        if ("caching_sha2_password".equals(new String(handshakePacket.authPluginName))) {
+            clientAuth = new ClientAuthenticationSHA2Packet();
+            isSha2Password = true;
+        } else {
+            clientAuth = new ClientAuthenticationPacket();
+        }
+        clientAuth.setCharsetNumber(handshakePacket.serverCharsetNumber);
 
         clientAuth.setUsername(username);
         clientAuth.setPassword(password);
         clientAuth.setServerCapabilities(handshakePacket.serverCapabilities);
         clientAuth.setDatabaseName(defaultSchema);
         clientAuth.setScrumbleBuff(joinAndCreateScrumbleBuff(handshakePacket));
-        clientAuth.setAuthPluginName("mysql_native_password".getBytes());
+        clientAuth.setAuthPluginName(handshakePacket.authPluginName);
 
         byte[] clientAuthPkgBody = clientAuth.toBytes();
         HeaderPacket h = new HeaderPacket();
         h.setPacketBodyLength(clientAuthPkgBody.length);
-        h.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
+        h.setPacketSequenceNumber((byte)(header.getPacketSequenceNumber() + 1));
 
         PacketManager.writePkg(channel, h.toBytes(), clientAuthPkgBody);
         logger.info("client authentication packet is sent out.");
@@ -205,57 +261,64 @@ public class MysqlConnector {
         assert body != null;
         byte marker = body[0];
         if (marker == -2 || marker == 1) {
-            byte[] authData = null;
-            String pluginName = null;
-            if (marker == 1) {
-                AuthSwitchRequestMoreData packet = new AuthSwitchRequestMoreData();
-                packet.fromBytes(body);
-                authData = packet.authData;
-            } else {
-                AuthSwitchRequestPacket packet = new AuthSwitchRequestPacket();
-                packet.fromBytes(body);
-                authData = packet.authData;
-                pluginName = packet.authName;
-                logger.info("auth switch pluginName is {}.", pluginName);
-            }
-
-            byte[] encryptedPassword = null;
-            if ("mysql_clear_password".equals(pluginName)) {
-                encryptedPassword = getPassword().getBytes();
-                header = authSwitchAfterAuth(encryptedPassword, header);
-                body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
-            } else if ("mysql_native_password".equals(pluginName)) {
-                try {
-                    encryptedPassword = MySQLPasswordEncrypter.scramble411(getPassword().getBytes(), authData);
-                } catch (NoSuchAlgorithmException e) {
-                    throw new RuntimeException("can't encrypt password that will be sent to MySQL server.", e);
-                }
-                header = authSwitchAfterAuth(encryptedPassword, header);
+            if (isSha2Password && body[1] == 3) {
+                // sha2 auth ok
+                logger.info("caching_sha2_password auth success.");
+                header = PacketManager.readHeader(channel, 4);
                 body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
-            } else if ("caching_sha2_password".equals(pluginName)) {
-                byte[] scramble = authData;
-                try {
-                    encryptedPassword = MySQLPasswordEncrypter.scrambleCachingSha2(getPassword().getBytes(), scramble);
-                } catch (DigestException e) {
-                    throw new RuntimeException("can't encrypt password that will be sent to MySQL server.", e);
+            } else {
+                byte[] authData = null;
+                String pluginName = null;
+                if (marker == 1) {
+                    AuthSwitchRequestMoreData packet = new AuthSwitchRequestMoreData();
+                    packet.fromBytes(body);
+                    authData = packet.authData;
+                } else {
+                    AuthSwitchRequestPacket packet = new AuthSwitchRequestPacket();
+                    packet.fromBytes(body);
+                    authData = packet.authData;
+                    pluginName = packet.authName;
+                    logger.info("auth switch pluginName is {}.", pluginName);
                 }
-                header = authSwitchAfterAuth(encryptedPassword, header);
-                body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
-                assert body != null;
-                if (body[0] == 0x01 && body[1] == 0x04) {
-                    // fixed issue https://github.com/alibaba/canal/pull/4767, support mysql 8.0.30+
-                    header = cachingSha2PasswordFullAuth(channel, header, getPassword().getBytes(), scramble);
+
+                byte[] encryptedPassword = null;
+                if ("mysql_clear_password".equals(pluginName)) {
+                    encryptedPassword = getPassword().getBytes();
+                    header = authSwitchAfterAuth(encryptedPassword, header);
                     body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
+                } else if ("mysql_native_password".equals(pluginName)) {
+                    try {
+                        encryptedPassword = MySQLPasswordEncrypter.scramble411(getPassword().getBytes(), authData);
+                    } catch (NoSuchAlgorithmException e) {
+                        throw new RuntimeException("can't encrypt password that will be sent to MySQL server.", e);
+                    }
+                    header = authSwitchAfterAuth(encryptedPassword, header);
+                    body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
+                } else if ("caching_sha2_password".equals(pluginName)) {
+                    byte[] scramble = authData;
+                    try {
+                        encryptedPassword = MySQLPasswordEncrypter.scrambleCachingSha2(getPassword().getBytes(),
+                            scramble);
+                    } catch (DigestException e) {
+                        throw new RuntimeException("can't encrypt password that will be sent to MySQL server.", e);
+                    }
+                    header = authSwitchAfterAuth(encryptedPassword, header);
+                    body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
+                    assert body != null;
+                    if (body[0] == 0x01 && body[1] == 0x04) {
+                        // fixed issue https://github.com/alibaba/canal/pull/4767, support mysql 8.0.30+
+                        header = cachingSha2PasswordFullAuth(channel, header, getPassword().getBytes(), scramble);
+                        body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
+                    } else {
+                        header = PacketManager.readHeader(channel, 4);
+                        body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
+                    }
                 } else {
-                    header = PacketManager.readHeader(channel, 4);
+                    header = authSwitchAfterAuth(encryptedPassword, header);
                     body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
                 }
-            } else {
-                header = authSwitchAfterAuth(encryptedPassword, header);
-                body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
             }
         }
-        assert body != null;
         if (body[0] < 0) {
             if (body[0] == -1) {
                 ErrorPacket err = new ErrorPacket();
@@ -380,14 +443,6 @@ public class MysqlConnector {
         this.username = username;
     }
 
-    public byte getCharsetNumber() {
-        return charsetNumber;
-    }
-
-    public void setCharsetNumber(byte charsetNumber) {
-        this.charsetNumber = charsetNumber;
-    }
-
     public String getDefaultSchema() {
         return defaultSchema;
     }
@@ -464,4 +519,11 @@ public class MysqlConnector {
         return serverVersion;
     }
 
+    public SslInfo getSslInfo() {
+        return sslInfo;
+    }
+
+    public void setSslInfo(SslInfo sslInfo) {
+        this.sslInfo = sslInfo;
+    }
 }

+ 65 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/ClientAuthenticationSHA2Packet.java

@@ -0,0 +1,65 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets.client;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.Capability;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.MSC;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.MySQLPasswordEncrypter;
+
+import org.apache.commons.lang.StringUtils;
+
+public class ClientAuthenticationSHA2Packet extends ClientAuthenticationPacket {
+
+    @Override
+    public byte[] toBytes() throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        // 1. write client_flags
+        int clientCapability = 0;
+        clientCapability |= Capability.CLIENT_LONG_PASSWORD;
+        clientCapability |= Capability.CLIENT_LONG_FLAG;
+        clientCapability |= Capability.CLIENT_PROTOCOL_41;
+        clientCapability |= Capability.CLIENT_TRANSACTIONS;
+        clientCapability |= Capability.CLIENT_SECURE_CONNECTION;
+        clientCapability |= Capability.CLIENT_MULTI_STATEMENTS;
+        clientCapability |= Capability.CLIENT_PLUGIN_AUTH;
+        clientCapability |= Capability.CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA;
+        if (getDatabaseName() != null) {
+            clientCapability |= Capability.CLIENT_CONNECT_WITH_DB;
+        }
+        ByteHelper.writeUnsignedIntLittleEndian(clientCapability, out);
+
+        // 2. write max_packet_size
+        ByteHelper.writeUnsignedIntLittleEndian(MSC.MAX_PACKET_LENGTH, out);
+        // 3. write charset_number
+        out.write(getCharsetNumber());
+        // 4. write (filler) always 0x00...
+        out.write(new byte[23]);
+        // 5. write (Null-Terminated String) user
+        ByteHelper.writeNullTerminatedString(getUsername(), out);
+        // 6. write (Length Coded Binary) scramble_buff (1 + x bytes)
+        if (StringUtils.isEmpty(getPassword())) {
+            out.write(0x00);
+        } else {
+            try {
+                byte[] encryptedPassword = MySQLPasswordEncrypter.scrambleCachingSha2(getPassword().getBytes(),
+                    getScrumbleBuff());
+                ByteHelper.writeBinaryCodedLengthBytes(encryptedPassword, out);
+            } catch (Exception e) {
+                throw new IOException("can't encrypt password that will be sent to MySQL server.", e);
+            }
+        }
+        // 7 . (Null-Terminated String) databasename (optional)
+        if (getDatabaseName() != null) {
+            ByteHelper.writeNullTerminatedString(getDatabaseName(), out);
+        }
+        // 8 . (Null-Terminated String) auth plugin name (optional)
+        if (getAuthPluginName() != null) {
+            ByteHelper.writeNullTerminated(getAuthPluginName(), out);
+        }
+        // end write
+        return out.toByteArray();
+    }
+
+}

+ 41 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/SslRequestCommandPacket.java

@@ -0,0 +1,41 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets.client;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.Capability;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.IPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
+
+/**
+ * @author 枭博
+ * @date 2024/05/14
+ */
+public class SslRequestCommandPacket implements IPacket {
+
+    private final int serverCharsetNumber;
+
+    public SslRequestCommandPacket(int serverCharsetNumber) {this.serverCharsetNumber = serverCharsetNumber;}
+
+    @Override
+    public void fromBytes(byte[] data) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte[] toBytes() throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        int clientCapabilities = Capability.CLIENT_LONG_FLAG
+            | Capability.CLIENT_PROTOCOL_41
+            | Capability.CLIENT_SECURE_CONNECTION
+            | Capability.CLIENT_PLUGIN_AUTH
+            | Capability.CLIENT_SSL;
+        ByteHelper.writeUnsignedIntLittleEndian(clientCapabilities, out);
+        ByteHelper.writeUnsignedIntLittleEndian(0, out);
+        out.write(serverCharsetNumber);
+        for (int i = 0; i < 23; i++) {
+            out.write(0);
+        }
+        return out.toByteArray();
+    }
+}

+ 10 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java

@@ -10,6 +10,8 @@ import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.nio.channels.ClosedByInterruptException;
 
+import javax.net.ssl.SSLSocket;
+
 /**
  * 使用BIO进行dump
  *
@@ -22,11 +24,13 @@ public class BioSocketChannel implements SocketChannel {
     private Socket       socket;
     private InputStream  input;
     private OutputStream output;
+    private final boolean ssl;
 
     BioSocketChannel(Socket socket) throws IOException{
         this.socket = socket;
         this.input = new BufferedInputStream(socket.getInputStream(), 16384);
         this.output = socket.getOutputStream();
+        this.ssl = (socket instanceof SSLSocket);
     }
 
     public void write(byte[]... buf) throws IOException {
@@ -133,6 +137,12 @@ public class BioSocketChannel implements SocketChannel {
         }
         return false;
     }
+    public boolean isSsl() {
+        return ssl;
+    }
+    public Socket getSocket() {
+        return socket;
+    }
 
     public SocketAddress getRemoteSocketAddress() {
         Socket socket = this.socket;

+ 304 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannelPool.java

@@ -1,22 +1,324 @@
 package com.alibaba.otter.canal.parse.driver.mysql.socket;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.Socket;
 import java.net.SocketAddress;
+import java.net.URL;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.KeyStore;
+import java.security.cert.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.naming.InvalidNameException;
+import javax.naming.ldap.LdapName;
+import javax.naming.ldap.Rdn;
+import javax.net.ssl.*;
+import javax.security.auth.x500.X500Principal;
+
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo;
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslMode;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author luoyaogui 实现channel的管理(监听连接、读数据、回收) 2016-12-28
  * @author chuanyi 2018-3-3 保留<code>open</code>减少文件变更数量
  */
 public abstract class BioSocketChannelPool {
+    private static final Logger logger = LoggerFactory.getLogger(BioSocketChannelPool.class);
 
     public static BioSocketChannel open(SocketAddress address) throws Exception {
-        Socket socket = new Socket();
+        Socket socket = createSocket(address);
+        return new BioSocketChannel(socket);
+    }
+
+    public static BioSocketChannel openSsl(Socket socket, SslInfo sslInfo) throws Exception {
+        SslMode sslMode = sslInfo.getSslMode();
+
+        switch (sslMode) {
+            case REQUIRED:
+            case PREFERRED:
+            case VERIFY_CA:
+            case VERIFY_IDENTITY:
+                SSLSocket sslSocket = createSslSocket(socket, sslInfo);
+                return new BioSocketChannel(sslSocket);
+            default:
+                throw new UnsupportedOperationException("Unsupported ssl mode: " + sslMode);
+        }
+    }
+
+    private static Socket createSocket(SocketAddress address) throws IOException {
+        Socket socket;
+        socket = new Socket();
         socket.setSoTimeout(BioSocketChannel.SO_TIMEOUT);
         socket.setTcpNoDelay(true);
         socket.setKeepAlive(true);
         socket.setReuseAddress(true);
         socket.connect(address, BioSocketChannel.DEFAULT_CONNECT_TIMEOUT);
-        return new BioSocketChannel(socket);
+        return socket;
+    }
+
+    /**
+     * from JDBC driver
+     *
+     * com.mysql.cj.protocol.ExportControlled#performTlsHandshake
+     * com.mysql.cj.protocol.ExportControlled#getSSLContext
+     *
+     * @param socket
+     * @param sslInfo
+     * @return
+     * @throws Exception
+     */
+    private static SSLSocket createSslSocket(Socket socket, SslInfo sslInfo) throws Exception {
+        SslMode sslMode = sslInfo.getSslMode();
+        boolean verifyServerCert = sslMode == SslMode.VERIFY_CA || sslMode == SslMode.VERIFY_IDENTITY;
+
+        String clientCertificateKeyStoreUrl = sslInfo.getClientCertificateKeyStoreUrl();
+        String clientCertificateKeyStoreType = sslInfo.getClientCertificateKeyStoreType() != null
+            ? sslInfo.getClientCertificateKeyStoreType()
+            : "JKS";
+        String clientCertificateKeyStorePassword = sslInfo.getClientCertificateKeyStorePassword();
+        String trustCertificateKeyStoreUrl = sslInfo.getTrustCertificateKeyStoreUrl();
+        String trustCertificateKeyStoreType = sslInfo.getTrustCertificateKeyStoreType() != null
+            ? sslInfo.getTrustCertificateKeyStoreType()
+            : "JKS";
+        String trustCertificateKeyStorePassword = sslInfo.getTrustCertificateKeyStorePassword();
+        boolean fallbackToDefaultTrustStore = true;
+        String hostName = sslMode == SslMode.VERIFY_IDENTITY ? socket.getInetAddress().getHostName() : null;
+
+        SSLContext sslContext = getSSLContext(clientCertificateKeyStoreUrl,
+            clientCertificateKeyStoreType,
+            clientCertificateKeyStorePassword,
+            trustCertificateKeyStoreUrl, trustCertificateKeyStoreType,
+            trustCertificateKeyStorePassword,
+            fallbackToDefaultTrustStore,
+            verifyServerCert, hostName);
+        SSLSocketFactory socketFactory = sslContext.getSocketFactory();
+
+        SSLSocket sslSocket = (SSLSocket)socketFactory.createSocket(
+            socket, socket.getInetAddress().getHostName(), socket.getPort(), true);
+
+        String[] protocolArr = null;
+        if (StringUtils.isNotEmpty(sslInfo.getTlsVersions())) {
+            protocolArr = StringUtils.split(sslInfo.getTlsVersions(), ",");
+        }
+        if (protocolArr == null || protocolArr.length == 0) {
+            protocolArr = new String[] {"TLSv1.2", "TLSv1.3"};
+        }
+        logger.info("SSL protocol: {}", StringUtils.join(protocolArr, ","));
+        sslSocket.setEnabledProtocols(protocolArr);
+        //sslSocket.setEnabledCipherSuites("TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDH_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDH_RSA_WITH_AES_256_GCM_SHA384,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_DHE_DSS_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_DSS_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384,TLS_RSA_WITH_AES_256_CBC_SHA256,TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA384,TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384,TLS_DHE_RSA_WITH_AES_256_CBC_SHA256,TLS_DHE_DSS_WITH_AES_256_CBC_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA,TLS_ECDH_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,TLS_RSA_WITH_AES_128_CBC_SHA256,TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256,TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_DSS_WITH_AES_128_CBC_SHA256,TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA,TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA,TLS_ECDH_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_DSS_WITH_AES_128_CBC_SHA".split(","));
+        sslSocket.startHandshake();
+        logger.info("SSL socket handshake success.");
+        return sslSocket;
+    }
+
+    private static SSLContext getSSLContext(String clientCertificateKeyStoreUrl, String clientCertificateKeyStoreType,
+        String clientCertificateKeyStorePassword,
+        String trustCertificateKeyStoreUrl, String trustCertificateKeyStoreType,
+        String trustCertificateKeyStorePassword,
+        boolean fallbackToDefaultTrustStore, boolean verifyServerCert, String hostName)
+        throws Exception {
+
+        KeyManager[] kms = null;
+        List<TrustManager> tms = new ArrayList<>();
+
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+
+        if (StringUtils.isNotEmpty(clientCertificateKeyStoreUrl)) {
+            InputStream ksIS = null;
+            try {
+                if (StringUtils.isNotEmpty(clientCertificateKeyStoreType)) {
+                    KeyStore clientKeyStore = KeyStore.getInstance(clientCertificateKeyStoreType);
+                    URL ksURL = new URL(clientCertificateKeyStoreUrl);
+                    char[] password = (clientCertificateKeyStorePassword == null) ? new char[0]
+                        : clientCertificateKeyStorePassword.toCharArray();
+                    ksIS = ksURL.openStream();
+                    clientKeyStore.load(ksIS, password);
+                    kmf.init(clientKeyStore, password);
+                    kms = kmf.getKeyManagers();
+                }
+            } finally {
+                if (ksIS != null) {
+                    try {
+                        ksIS.close();
+                    } catch (IOException e) {
+                        // can't close input stream, but keystore can be properly initialized so we shouldn't throw
+                        // this exception
+                    }
+                }
+            }
+        }
+
+        InputStream trustStoreIS = null;
+        try {
+            String trustStoreType = "";
+            char[] trustStorePassword = null;
+            KeyStore trustKeyStore = null;
+
+            if (StringUtils.isNotEmpty(trustCertificateKeyStoreUrl) && StringUtils.isNotEmpty(
+                trustCertificateKeyStoreType)) {
+                trustStoreType = trustCertificateKeyStoreType;
+                trustStorePassword = (trustCertificateKeyStorePassword == null) ? new char[0]
+                    : trustCertificateKeyStorePassword.toCharArray();
+                trustStoreIS = new URL(trustCertificateKeyStoreUrl).openStream();
+
+                trustKeyStore = KeyStore.getInstance(trustStoreType);
+                trustKeyStore.load(trustStoreIS, trustStorePassword);
+            }
+
+            if (trustKeyStore != null || fallbackToDefaultTrustStore) {
+                tmf.init(trustKeyStore);
+                // (trustKeyStore == null) initializes the TrustManagerFactory with the default truststore.
+
+                // building the customized list of TrustManagers from original one if it's available
+                TrustManager[] origTms = tmf.getTrustManagers();
+
+                for (TrustManager tm : origTms) {
+                    // wrap X509TrustManager or put original if non-X509 TrustManager
+                    tms.add(tm instanceof X509TrustManager ? new X509TrustManagerWrapper((X509TrustManager)tm,
+                        verifyServerCert, hostName) : tm);
+                }
+            }
+
+        } finally {
+            if (trustStoreIS != null) {
+                try {
+                    trustStoreIS.close();
+                } catch (IOException e) {
+                    // can't close input stream, but keystore can be properly initialized so we shouldn't throw this
+                    // exception
+                }
+            }
+        }
+
+        // if original TrustManagers are not available then putting one X509TrustManagerWrapper which take care only
+        // about expiration check
+        if (tms.size() == 0) {
+            tms.add(new X509TrustManagerWrapper(verifyServerCert, hostName));
+        }
+
+        SSLContext sslContext = SSLContext.getInstance("TLS");
+        sslContext.init(kms, tms.toArray(new TrustManager[tms.size()]), null);
+        return sslContext;
+    }
+
+    public static class X509TrustManagerWrapper implements X509TrustManager {
+
+        private X509TrustManager origTm = null;
+        private boolean verifyServerCert = false;
+        private String hostName = null;
+        private CertificateFactory certFactory = null;
+        private PKIXParameters validatorParams = null;
+        private CertPathValidator validator = null;
+
+        public X509TrustManagerWrapper(X509TrustManager tm, boolean verifyServerCertificate, String hostName)
+            throws CertificateException {
+            this.origTm = tm;
+            this.verifyServerCert = verifyServerCertificate;
+            this.hostName = hostName;
+
+            if (verifyServerCertificate) {
+                try {
+                    Set<TrustAnchor> anch = Arrays.stream(tm.getAcceptedIssuers()).map(c -> new TrustAnchor(c, null))
+                        .collect(
+                            Collectors.toSet());
+                    this.validatorParams = new PKIXParameters(anch);
+                    this.validatorParams.setRevocationEnabled(false);
+                    this.validator = CertPathValidator.getInstance("PKIX");
+                    this.certFactory = CertificateFactory.getInstance("X.509");
+                } catch (Exception e) {
+                    throw new CertificateException(e);
+                }
+            }
+
+        }
+
+        public X509TrustManagerWrapper(boolean verifyServerCertificate, String hostName) {
+            this.verifyServerCert = verifyServerCertificate;
+            this.hostName = hostName;
+        }
+
+        public X509Certificate[] getAcceptedIssuers() {
+            return this.origTm != null ? this.origTm.getAcceptedIssuers() : new X509Certificate[0];
+        }
+
+        public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+            for (int i = 0; i < chain.length; i++) {
+                chain[i].checkValidity();
+            }
+
+            if (this.validatorParams != null) {
+                X509CertSelector certSelect = new X509CertSelector();
+                certSelect.setSerialNumber(chain[0].getSerialNumber());
+
+                try {
+                    CertPath certPath = this.certFactory.generateCertPath(Arrays.asList(chain));
+                    // Validate against truststore
+                    CertPathValidatorResult result = this.validator.validate(certPath, this.validatorParams);
+                    // Check expiration for the CA used to validate this path
+                    ((PKIXCertPathValidatorResult)result).getTrustAnchor().getTrustedCert().checkValidity();
+
+                } catch (InvalidAlgorithmParameterException e) {
+                    throw new CertificateException(e);
+                } catch (CertPathValidatorException e) {
+                    throw new CertificateException(e);
+                }
+            }
+
+            if (this.verifyServerCert) {
+                if (this.origTm != null) {
+                    this.origTm.checkServerTrusted(chain, authType);
+                } else {
+                    throw new CertificateException(
+                        "Can't verify server certificate because no trust manager is found.");
+                }
+
+                // verify server certificate identity
+                if (this.hostName != null) {
+                    logger.info("verify hostName: {}", this.hostName);
+                    Set<String> expectHostNames = new HashSet<>();
+                    for (X509Certificate certificate : chain) {
+                        String dn = certificate.getSubjectX500Principal().getName(X500Principal.RFC2253);
+                        String cn = null;
+                        try {
+                            LdapName ldapDN = new LdapName(dn);
+                            for (Rdn rdn : ldapDN.getRdns()) {
+                                if (rdn.getType().equalsIgnoreCase("CN")) {
+                                    cn = rdn.getValue().toString();
+                                    break;
+                                }
+                            }
+                        } catch (InvalidNameException e) {
+                            throw new CertificateException(
+                                "Failed to retrieve the Common Name (CN) from the server certificate.");
+                        }
+                        expectHostNames.add(cn);
+                    }
+
+                    if (!expectHostNames.contains(this.hostName)) {
+                        throw new CertificateException(
+                            "Server certificate identity check failed. The certificate Common Name "
+                                + expectHostNames.stream().map(h -> "'" + h + "'").collect(Collectors.joining(", "))
+                                + " does not match with '" + this.hostName + "'.");
+                    }
+
+                }
+            }
+        }
+
+        public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+            this.origTm.checkClientTrusted(chain, authType);
+        }
     }
 
 }

+ 34 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java

@@ -1,14 +1,21 @@
 package com.alibaba.otter.canal.parse.driver.mysql.socket;
 
+import java.io.IOException;
 import java.net.SocketAddress;
 
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo;
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslMode;
+
 import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author agapple 2018年3月12日 下午10:46:22
  * @since 1.0.26
  */
 public abstract class SocketChannelPool {
+    private static final Logger logger = LoggerFactory.getLogger(SocketChannelPool.class);
 
     public static SocketChannel open(SocketAddress address) throws Exception {
         String type = chooseSocketChannel();
@@ -17,7 +24,34 @@ public abstract class SocketChannelPool {
         } else {
             return BioSocketChannelPool.open(address);
         }
+    }
 
+    public static SocketChannel connectSsl(SocketChannel channel, SslInfo sslInfo) throws IOException {
+        SslMode sslMode = sslInfo.getSslMode();
+
+        String type = chooseSocketChannel();
+        if ("netty".equalsIgnoreCase(type)) {
+            throw new UnsupportedOperationException("canal.socketChannel netty not support ssl mode: " + sslMode);
+        } else {
+            SocketAddress remoteSocketAddress = channel.getRemoteSocketAddress();
+            try {
+                return BioSocketChannelPool.openSsl(
+                    ((BioSocketChannel)channel).getSocket(), sslInfo);
+            } catch (Exception e) {
+                if (sslMode == SslMode.PREFERRED) {
+                    // still use non ssl channel
+                    logger.info("{} still use non SSL channel due to SSL connect failed.", remoteSocketAddress, e);
+                    return channel;
+                }
+                IOException ioe;
+                if (e instanceof IOException) {
+                    ioe = (IOException)e;
+                } else {
+                    ioe = new IOException(e);
+                }
+                throw ioe;
+            }
+        }
     }
 
     private static String chooseSocketChannel() {

+ 98 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/ssl/SslInfo.java

@@ -0,0 +1,98 @@
+package com.alibaba.otter.canal.parse.driver.mysql.ssl;
+
+/**
+ * @author 枭博
+ * @date 2024/05/14
+ */
+public class SslInfo {
+
+    private SslMode sslMode = SslMode.DISABLED;
+    private String tlsVersions; // 和 enabledTLSProtocols 同含义,TLSv1.2,TLSv1.3
+    private String trustCertificateKeyStoreType; // trustStore 证书类型,支持 JKS (默认) 和 PKCS12
+    private String trustCertificateKeyStoreUrl; // trustStore 证书
+    private String trustCertificateKeyStorePassword; // trustStore 证书密码
+    private String clientCertificateKeyStoreType; // client 证书类型,支持 JKS (默认) 和 PKCS12
+    private String clientCertificateKeyStoreUrl; // client 证书
+    private String clientCertificateKeyStorePassword; // client 证书密码
+
+    public SslInfo(SslMode sslMode, String tlsVersions, String trustCertificateKeyStoreType,
+        String trustCertificateKeyStoreUrl, String trustCertificateKeyStorePassword,
+        String clientCertificateKeyStoreType,
+        String clientCertificateKeyStoreUrl, String clientCertificateKeyStorePassword) {
+        this.sslMode = sslMode;
+        this.tlsVersions = tlsVersions;
+        this.trustCertificateKeyStoreType = trustCertificateKeyStoreType;
+        this.trustCertificateKeyStoreUrl = trustCertificateKeyStoreUrl;
+        this.trustCertificateKeyStorePassword = trustCertificateKeyStorePassword;
+        this.clientCertificateKeyStoreType = clientCertificateKeyStoreType;
+        this.clientCertificateKeyStoreUrl = clientCertificateKeyStoreUrl;
+        this.clientCertificateKeyStorePassword = clientCertificateKeyStorePassword;
+    }
+
+    public SslInfo() {
+    }
+
+    public SslMode getSslMode() {
+        return sslMode;
+    }
+
+    public void setSslMode(SslMode sslMode) {
+        this.sslMode = sslMode;
+    }
+
+    public String getTlsVersions() {
+        return tlsVersions;
+    }
+
+    public void setTlsVersions(String tlsVersions) {
+        this.tlsVersions = tlsVersions;
+    }
+
+    public String getTrustCertificateKeyStoreType() {
+        return trustCertificateKeyStoreType;
+    }
+
+    public void setTrustCertificateKeyStoreType(String trustCertificateKeyStoreType) {
+        this.trustCertificateKeyStoreType = trustCertificateKeyStoreType;
+    }
+
+    public String getTrustCertificateKeyStoreUrl() {
+        return trustCertificateKeyStoreUrl;
+    }
+
+    public void setTrustCertificateKeyStoreUrl(String trustCertificateKeyStoreUrl) {
+        this.trustCertificateKeyStoreUrl = trustCertificateKeyStoreUrl;
+    }
+
+    public String getTrustCertificateKeyStorePassword() {
+        return trustCertificateKeyStorePassword;
+    }
+
+    public void setTrustCertificateKeyStorePassword(String trustCertificateKeyStorePassword) {
+        this.trustCertificateKeyStorePassword = trustCertificateKeyStorePassword;
+    }
+
+    public String getClientCertificateKeyStoreType() {
+        return clientCertificateKeyStoreType;
+    }
+
+    public void setClientCertificateKeyStoreType(String clientCertificateKeyStoreType) {
+        this.clientCertificateKeyStoreType = clientCertificateKeyStoreType;
+    }
+
+    public String getClientCertificateKeyStoreUrl() {
+        return clientCertificateKeyStoreUrl;
+    }
+
+    public void setClientCertificateKeyStoreUrl(String clientCertificateKeyStoreUrl) {
+        this.clientCertificateKeyStoreUrl = clientCertificateKeyStoreUrl;
+    }
+
+    public String getClientCertificateKeyStorePassword() {
+        return clientCertificateKeyStorePassword;
+    }
+
+    public void setClientCertificateKeyStorePassword(String clientCertificateKeyStorePassword) {
+        this.clientCertificateKeyStorePassword = clientCertificateKeyStorePassword;
+    }
+}

+ 26 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/ssl/SslMode.java

@@ -0,0 +1,26 @@
+package com.alibaba.otter.canal.parse.driver.mysql.ssl;
+
+public enum SslMode {
+
+    /**
+     * 关闭SSL
+     */
+    DISABLED,
+    /**
+     * 尝试SSL传输
+     */
+    PREFERRED,
+    /**
+     * 要求SSL传输,不校验证书
+     */
+    REQUIRED,
+    /**
+     * 要求SSL传输,校验证书,不校验证书里的域名
+     */
+    VERIFY_CA,
+    /**
+     * 要求SSL传输,校验证书,校验证书里的域名
+     */
+    VERIFY_IDENTITY
+
+}

+ 7 - 6
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -276,8 +276,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             mysqlEventParser.setDestination(destination);
             // 编码参数
             mysqlEventParser.setConnectionCharset(parameters.getConnectionCharset());
-            mysqlEventParser.setConnectionCharsetNumber(parameters.getConnectionCharsetNumber());
-            // 网络相关参数
+            // 网络相关参数1
             mysqlEventParser.setDefaultConnectionTimeoutInSeconds(parameters.getDefaultConnectionTimeoutInSeconds());
             mysqlEventParser.setSendBufferSize(parameters.getSendBufferSize());
             mysqlEventParser.setReceiveBufferSize(parameters.getReceiveBufferSize());
@@ -291,13 +290,15 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
                 mysqlEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0),
                     parameters.getDbUsername(),
                     parameters.getDbPassword(),
-                    parameters.getDefaultDatabaseName()));
+                    parameters.getDefaultDatabaseName(),
+                    parameters.getSslInfo()));
 
                 if (dbAddresses.size() > 1) {
                     mysqlEventParser.setStandbyInfo(new AuthenticationInfo(dbAddresses.get(1),
                         parameters.getDbUsername(),
                         parameters.getDbPassword(),
-                        parameters.getDefaultDatabaseName()));
+                        parameters.getDefaultDatabaseName(),
+                        parameters.getSslInfo()));
                 }
             }
 
@@ -369,7 +370,6 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             localBinlogEventParser.setDestination(destination);
             localBinlogEventParser.setBufferSize(parameters.getReceiveBufferSize());
             localBinlogEventParser.setConnectionCharset(parameters.getConnectionCharset());
-            localBinlogEventParser.setConnectionCharsetNumber(parameters.getConnectionCharsetNumber());
             localBinlogEventParser.setDirectory(parameters.getLocalBinlogDirectory());
             localBinlogEventParser.setProfilingEnabled(false);
             localBinlogEventParser.setDetectingEnable(parameters.getDetectingEnable());
@@ -381,7 +381,8 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
                 localBinlogEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0),
                     parameters.getDbUsername(),
                     parameters.getDbPassword(),
-                    parameters.getDefaultDatabaseName()));
+                    parameters.getDefaultDatabaseName(),
+                    parameters.getSslInfo()));
             }
 
             eventParser = localBinlogEventParser;

+ 163 - 17
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -9,6 +9,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.ToStringBuilder;
 
 import com.alibaba.otter.canal.common.utils.CanalToStringStyle;
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo;
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslMode;
 
 /**
  * canal运行相关参数
@@ -56,8 +58,6 @@ public class CanalParameter implements Serializable {
     private Integer                  defaultConnectionTimeoutInSeconds  = 30;                        // sotimeout
     private Integer                  receiveBufferSize                  = 64 * 1024;
     private Integer                  sendBufferSize                     = 64 * 1024;
-    // 编码信息
-    private Byte                     connectionCharsetNumber            = (byte) 33;
     private String                   connectionCharset                  = "UTF-8";
 
     // 数据库信息
@@ -66,6 +66,15 @@ public class CanalParameter implements Serializable {
     private String                   dbUsername;                                                     // 数据库用户
     private String                   dbPassword;                                                     // 数据库密码
 
+    private String sslMode = SslMode.DISABLED.name();
+    private String tlsVersions; // 和 enabledTLSProtocols 同含义,TLSv1.2,TLSv1.3
+    private String trustCertificateKeyStoreType; // trustStore 证书类型,支持 JKS (默认) 和 PKCS12
+    private String trustCertificateKeyStoreUrl; // trustStore 证书路径
+    private String trustCertificateKeyStorePassword; // trustStore 证书密码
+    private String clientCertificateKeyStoreType; // client 证书类型,支持 JKS (默认) 和 PKCS12
+    private String clientCertificateKeyStoreUrl; // client 证书路径
+    private String clientCertificateKeyStorePassword; // client 证书密码
+
     // binlog链接信息
     private IndexMode                indexMode;
     private List<String>             positions;                                                      // 数据库positions信息
@@ -108,10 +117,27 @@ public class CanalParameter implements Serializable {
     private InetSocketAddress        masterAddress;                                                  // 主库信息
     private String                   masterUsername;                                                 // 帐号
     private String                   masterPassword;                                                 // 密码
+    private String masterSslMode = SslMode.DISABLED.name();
+    private String masterTlsVersions; // 和 enabledTLSProtocols 同含义,TLSv1.2,TLSv1.3
+    private String masterTrustCertificateKeyStoreType; // trustStore 证书类型,支持 JKS (默认) 和 PKCS12
+    private String masterTrustCertificateKeyStoreUrl; // trustStore 证书路径
+    private String masterTrustCertificateKeyStorePassword; // trustStore 证书密码
+    private String masterClientCertificateKeyStoreType; // client 证书类型,支持 JKS (默认) 和 PKCS12
+    private String masterClientCertificateKeyStoreUrl; // client 证书路径
+    private String masterClientCertificateKeyStorePassword; // client 证书密码
 
     private InetSocketAddress        standbyAddress;                                                 // 备库信息
     private String                   standbyUsername;                                                // 帐号
     private String                   standbyPassword;
+    private String standbySslMode = SslMode.DISABLED.name();
+    private String standbyTlsVersions; // 和 enabledTLSProtocols 同含义,TLSv1.2,TLSv1.3
+    private String standbyTrustCertificateKeyStoreType; // trustStore 证书类型,支持 JKS (默认) 和 PKCS12
+    private String standbyTrustCertificateKeyStoreUrl; // trustStore 证书路径
+    private String standbyTrustCertificateKeyStorePassword; // trustStore 证书密码
+    private String standbyClientCertificateKeyStoreType; // client 证书类型,支持 JKS (默认) 和 PKCS12
+    private String standbyClientCertificateKeyStoreUrl; // client 证书路径
+    private String standbyClientCertificateKeyStorePassword; // client 证书密码
+
     private String                   masterLogfileName                  = null;                      // master起始位置
     private Long                     masterLogfileOffest                = null;
     private Long                     masterTimestamp                    = null;
@@ -533,14 +559,6 @@ public class CanalParameter implements Serializable {
         this.sendBufferSize = sendBufferSize;
     }
 
-    public Byte getConnectionCharsetNumber() {
-        return connectionCharsetNumber;
-    }
-
-    public void setConnectionCharsetNumber(Byte connectionCharsetNumber) {
-        this.connectionCharsetNumber = connectionCharsetNumber;
-    }
-
     public String getConnectionCharset() {
         return connectionCharset;
     }
@@ -685,16 +703,16 @@ public class CanalParameter implements Serializable {
                     groupDbAddresses.add(groupAddresses);
                 }
             } else {
-                if (masterAddress != null) {
-                    List<DataSourcing> groupAddresses = new ArrayList<>();
+            if (masterAddress != null) {
+                List<DataSourcing> groupAddresses = new ArrayList<>();
                     groupAddresses.add(new DataSourcing(sourcingType, masterAddress));
-                    groupDbAddresses.add(groupAddresses);
-                }
+                groupDbAddresses.add(groupAddresses);
+            }
 
-                if (standbyAddress != null) {
-                    List<DataSourcing> groupAddresses = new ArrayList<>();
+            if (standbyAddress != null) {
+                List<DataSourcing> groupAddresses = new ArrayList<>();
                     groupAddresses.add(new DataSourcing(sourcingType, standbyAddress));
-                    groupDbAddresses.add(groupAddresses);
+                groupDbAddresses.add(groupAddresses);
                 }
             }
         }
@@ -731,6 +749,134 @@ public class CanalParameter implements Serializable {
         this.dbPassword = dbPassword;
     }
 
+    public SslInfo getSslInfo() {
+        if (dbUsername == null) {
+            if (masterUsername != null) {
+                return new SslInfo(SslMode.valueOf(masterSslMode),
+                    masterTlsVersions,
+                    masterTrustCertificateKeyStoreType,
+                    masterTrustCertificateKeyStoreUrl,
+                    masterTrustCertificateKeyStorePassword,
+                    masterClientCertificateKeyStoreType,
+                    masterClientCertificateKeyStoreUrl,
+                    masterClientCertificateKeyStorePassword);
+            } else {
+                return new SslInfo(SslMode.valueOf(standbySslMode),
+                    standbyTlsVersions,
+                    standbyTrustCertificateKeyStoreType,
+                    standbyTrustCertificateKeyStoreUrl,
+                    standbyTrustCertificateKeyStorePassword,
+                    standbyClientCertificateKeyStoreType,
+                    standbyClientCertificateKeyStoreUrl,
+                    standbyClientCertificateKeyStorePassword);
+            }
+        }
+        return new SslInfo(SslMode.valueOf(sslMode),
+            tlsVersions,
+            trustCertificateKeyStoreType,
+            trustCertificateKeyStoreUrl,
+            trustCertificateKeyStorePassword,
+            clientCertificateKeyStoreType,
+            clientCertificateKeyStoreUrl,
+            clientCertificateKeyStorePassword);
+    }
+
+    public void setSslMode(String sslMode) {
+        this.sslMode = sslMode;
+    }
+
+    public void setTlsVersions(String tlsVersions) {
+        this.tlsVersions = tlsVersions;
+    }
+
+    public void setTrustCertificateKeyStoreType(String trustCertificateKeyStoreType) {
+        this.trustCertificateKeyStoreType = trustCertificateKeyStoreType;
+    }
+
+    public void setTrustCertificateKeyStoreUrl(String trustCertificateKeyStoreUrl) {
+        this.trustCertificateKeyStoreUrl = trustCertificateKeyStoreUrl;
+    }
+
+    public void setTrustCertificateKeyStorePassword(String trustCertificateKeyStorePassword) {
+        this.trustCertificateKeyStorePassword = trustCertificateKeyStorePassword;
+    }
+
+    public void setClientCertificateKeyStoreType(String clientCertificateKeyStoreType) {
+        this.clientCertificateKeyStoreType = clientCertificateKeyStoreType;
+    }
+
+    public void setClientCertificateKeyStoreUrl(String clientCertificateKeyStoreUrl) {
+        this.clientCertificateKeyStoreUrl = clientCertificateKeyStoreUrl;
+    }
+
+    public void setClientCertificateKeyStorePassword(String clientCertificateKeyStorePassword) {
+        this.clientCertificateKeyStorePassword = clientCertificateKeyStorePassword;
+    }
+
+    public void setMasterSslMode(String masterSslMode) {
+        this.masterSslMode = masterSslMode;
+    }
+
+    public void setMasterTlsVersions(String masterTlsVersions) {
+        this.masterTlsVersions = masterTlsVersions;
+    }
+
+    public void setMasterTrustCertificateKeyStoreType(String masterTrustCertificateKeyStoreType) {
+        this.masterTrustCertificateKeyStoreType = masterTrustCertificateKeyStoreType;
+    }
+
+    public void setMasterTrustCertificateKeyStoreUrl(String masterTrustCertificateKeyStoreUrl) {
+        this.masterTrustCertificateKeyStoreUrl = masterTrustCertificateKeyStoreUrl;
+    }
+
+    public void setMasterTrustCertificateKeyStorePassword(String masterTrustCertificateKeyStorePassword) {
+        this.masterTrustCertificateKeyStorePassword = masterTrustCertificateKeyStorePassword;
+    }
+
+    public void setMasterClientCertificateKeyStoreType(String masterClientCertificateKeyStoreType) {
+        this.masterClientCertificateKeyStoreType = masterClientCertificateKeyStoreType;
+    }
+
+    public void setMasterClientCertificateKeyStoreUrl(String masterClientCertificateKeyStoreUrl) {
+        this.masterClientCertificateKeyStoreUrl = masterClientCertificateKeyStoreUrl;
+    }
+
+    public void setMasterClientCertificateKeyStorePassword(String masterClientCertificateKeyStorePassword) {
+        this.masterClientCertificateKeyStorePassword = masterClientCertificateKeyStorePassword;
+    }
+
+    public void setStandbySslMode(String standbySslMode) {
+        this.standbySslMode = standbySslMode;
+    }
+
+    public void setStandbyTlsVersions(String standbyTlsVersions) {
+        this.standbyTlsVersions = standbyTlsVersions;
+    }
+
+    public void setStandbyTrustCertificateKeyStoreType(String standbyTrustCertificateKeyStoreType) {
+        this.standbyTrustCertificateKeyStoreType = standbyTrustCertificateKeyStoreType;
+    }
+
+    public void setStandbyTrustCertificateKeyStoreUrl(String standbyTrustCertificateKeyStoreUrl) {
+        this.standbyTrustCertificateKeyStoreUrl = standbyTrustCertificateKeyStoreUrl;
+    }
+
+    public void setStandbyTrustCertificateKeyStorePassword(String standbyTrustCertificateKeyStorePassword) {
+        this.standbyTrustCertificateKeyStorePassword = standbyTrustCertificateKeyStorePassword;
+    }
+
+    public void setStandbyClientCertificateKeyStoreType(String standbyClientCertificateKeyStoreType) {
+        this.standbyClientCertificateKeyStoreType = standbyClientCertificateKeyStoreType;
+    }
+
+    public void setStandbyClientCertificateKeyStoreUrl(String standbyClientCertificateKeyStoreUrl) {
+        this.standbyClientCertificateKeyStoreUrl = standbyClientCertificateKeyStoreUrl;
+    }
+
+    public void setStandbyClientCertificateKeyStorePassword(String standbyClientCertificateKeyStorePassword) {
+        this.standbyClientCertificateKeyStorePassword = standbyClientCertificateKeyStorePassword;
+    }
+
     public List<String> getPositions() {
         if (positions == null) {
             positions = new ArrayList<>();

+ 0 - 1
instance/spring/src/test/resources/spring/default-instance.xml

@@ -114,7 +114,6 @@
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
 		
 		<!-- 解析编码 -->
-		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
 	
 		<!-- 解析位点记录 -->

+ 0 - 1
instance/spring/src/test/resources/spring/file-instance.xml

@@ -100,7 +100,6 @@
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
 		
 		<!-- 解析编码 -->
-		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
 
 		<!-- 解析位点记录 -->

+ 0 - 2
instance/spring/src/test/resources/spring/group-instance.xml

@@ -106,7 +106,6 @@
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
 		
 		<!-- 解析编码 -->
-		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
 	
 		<!-- 解析位点记录 -->
@@ -186,7 +185,6 @@
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
 		
 		<!-- 解析编码 -->
-		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
 	
 		<!-- 解析位点记录 -->

+ 0 - 1
instance/spring/src/test/resources/spring/memory-instance.xml

@@ -97,7 +97,6 @@
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
 		
 		<!-- 解析编码 -->
-		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
 	
 		<!-- 解析位点记录 -->

+ 0 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -33,7 +33,6 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     protected TableMetaTSDB        tableMetaTSDB;
 
     // 编码信息
-    protected byte                 connectionCharsetNumber   = (byte) 33;
     protected Charset              connectionCharset         = Charset.forName("UTF-8");
     protected boolean              filterQueryDcl            = false;
     protected boolean              filterQueryDml            = false;
@@ -205,10 +204,6 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
     // ============================ setter / getter =========================
 
-    public void setConnectionCharsetNumber(byte connectionCharsetNumber) {
-        this.connectionCharsetNumber = connectionCharsetNumber;
-    }
-
     public void setConnectionCharsetStd(Charset connectionCharset) {
         this.connectionCharset = connectionCharset;
     }

+ 0 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -116,7 +116,6 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
         MysqlConnection connection = new MysqlConnection(runningInfo.getAddress(),
             runningInfo.getUsername(),
             runningInfo.getPassword(),
-            connectionCharsetNumber,
             runningInfo.getDefaultDatabaseName());
         connection.getConnector().setReceiveBufferSize(64 * 1024);
         connection.getConnector().setSendBufferSize(64 * 1024);

+ 18 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -27,6 +27,7 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.client.RegisterSlaveCo
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.SemiAckCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
@@ -73,14 +74,28 @@ public class MysqlConnection implements ErosaConnection {
         connector.setConnTimeout(connTimeout);
     }
 
-    public MysqlConnection(InetSocketAddress address, String username, String password, byte charsetNumber,
-                           String defaultSchema){
+    public MysqlConnection(InetSocketAddress address, String username, String password,
+        String defaultSchema){
         authInfo = new AuthenticationInfo();
         authInfo.setAddress(address);
         authInfo.setUsername(username);
         authInfo.setPassword(password);
         authInfo.setDefaultDatabaseName(defaultSchema);
-        connector = new MysqlConnector(address, username, password, charsetNumber, defaultSchema);
+        connector = new MysqlConnector(address, username, password, defaultSchema);
+        // 将connection里面的参数透传下
+        connector.setSoTimeout(soTimeout);
+        connector.setConnTimeout(connTimeout);
+    }
+
+    public MysqlConnection(InetSocketAddress address, String username, String password,
+                           String defaultSchema, SslInfo sslInfo){
+        authInfo = new AuthenticationInfo();
+        authInfo.setAddress(address);
+        authInfo.setUsername(username);
+        authInfo.setPassword(password);
+        authInfo.setDefaultDatabaseName(defaultSchema);
+        authInfo.setSslInfo(sslInfo);
+        connector = new MysqlConnector(address, username, password, defaultSchema, sslInfo);
         // 将connection里面的参数透传下
         connector.setSoTimeout(soTimeout);
         connector.setConnTimeout(connTimeout);

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -305,8 +305,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         MysqlConnection connection = new MysqlConnection(runningInfo.getAddress(),
             runningInfo.getUsername(),
             runningInfo.getPassword(),
-            connectionCharsetNumber,
-            runningInfo.getDefaultDatabaseName());
+            runningInfo.getDefaultDatabaseName(),
+            runningInfo.getSslInfo());
         connection.getConnector().setReceiveBufferSize(receiveBufferSize);
         connection.getConnector().setSendBufferSize(sendBufferSize);
         connection.getConnector().setSoTimeout(defaultConnectionTimeoutInSeconds * 1000);

+ 0 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java

@@ -51,7 +51,6 @@ public class RdsBinlogEventParserProxy extends MysqlEventParser {
             rdsLocalBinlogEventParser.setDestination(destination);
             rdsLocalBinlogEventParser.setAlarmHandler(this.getAlarmHandler());
             rdsLocalBinlogEventParser.setConnectionCharsetStd(this.connectionCharset);
-            rdsLocalBinlogEventParser.setConnectionCharsetNumber(this.connectionCharsetNumber);
             rdsLocalBinlogEventParser.setEnableTsdb(this.enableTsdb);
             rdsLocalBinlogEventParser.setEventBlackFilter(this.eventBlackFilter);
             rdsLocalBinlogEventParser.setFilterQueryDcl(this.filterQueryDcl);

+ 18 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/support/AuthenticationInfo.java

@@ -3,6 +3,8 @@ package com.alibaba.otter.canal.parse.support;
 import java.net.InetSocketAddress;
 
 import com.alibaba.otter.canal.common.utils.CommonUtils;
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo;
+
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 
@@ -22,6 +24,7 @@ public class AuthenticationInfo {
     private String            defaultDatabaseName;// 默认链接的数据库
     private String            pwdPublicKey;       //公钥
     private boolean           enableDruid;        //是否使用druid加密解密数据库密码
+    private SslInfo sslInfo;
 
     public void initPwd() throws Exception{
         if (enableDruid) {
@@ -37,13 +40,19 @@ public class AuthenticationInfo {
         this(address, username, password, "");
     }
 
-    public AuthenticationInfo(InetSocketAddress address, String username, String password, String defaultDatabaseName){
+    private AuthenticationInfo(InetSocketAddress address, String username, String password, String defaultDatabaseName){
         this.address = address;
         this.username = username;
         this.password = password;
         this.defaultDatabaseName = defaultDatabaseName;
     }
 
+    public AuthenticationInfo(InetSocketAddress address, String username, String password,
+        String defaultDatabaseName, SslInfo sslInfo) {
+        this(address, username, password, defaultDatabaseName);
+        this.sslInfo = sslInfo;
+    }
+
     public InetSocketAddress getAddress() {
         return address;
     }
@@ -92,6 +101,14 @@ public class AuthenticationInfo {
         this.enableDruid = enableDruid;
     }
 
+    public SslInfo getSslInfo() {
+        return sslInfo;
+    }
+
+    public void setSslInfo(SslInfo sslInfo) {
+        this.sslInfo = sslInfo;
+    }
+
     @Override
     public String toString() {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE);

+ 0 - 1
server/src/test/java/com/alibaba/otter/canal/server/CanalServerTest.java

@@ -253,7 +253,6 @@ public class CanalServerTest {
 
         parameter.setDefaultConnectionTimeoutInSeconds(30);
         parameter.setConnectionCharset("UTF-8");
-        parameter.setConnectionCharsetNumber((byte) 33);
         parameter.setReceiveBufferSize(8 * 1024);
         parameter.setSendBufferSize(8 * 1024);
 

+ 0 - 1
server/src/test/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded_FileModeTest.java

@@ -40,7 +40,6 @@ public class CanalServerWithEmbedded_FileModeTest extends BaseCanalServerWithEmb
 
         parameter.setDefaultConnectionTimeoutInSeconds(30);
         parameter.setConnectionCharset("UTF-8");
-        parameter.setConnectionCharsetNumber((byte) 33);
         parameter.setReceiveBufferSize(8 * 1024);
         parameter.setSendBufferSize(8 * 1024);
 

+ 0 - 1
server/src/test/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded_StandaloneTest.java

@@ -43,7 +43,6 @@ public class CanalServerWithEmbedded_StandaloneTest extends BaseCanalServerWithE
 
         parameter.setDefaultConnectionTimeoutInSeconds(30);
         parameter.setConnectionCharset("UTF-8");
-        parameter.setConnectionCharsetNumber((byte) 33);
         parameter.setReceiveBufferSize(8 * 1024);
         parameter.setSendBufferSize(8 * 1024);
 

+ 0 - 1
server/src/test/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded_StandbyTest.java

@@ -55,7 +55,6 @@ public class CanalServerWithEmbedded_StandbyTest extends BaseCanalServerWithEmbe
 
         parameter.setDefaultConnectionTimeoutInSeconds(30);
         parameter.setConnectionCharset("UTF-8");
-        parameter.setConnectionCharsetNumber((byte) 33);
         parameter.setReceiveBufferSize(8 * 1024);
         parameter.setSendBufferSize(8 * 1024);