瀏覽代碼

fixed pull #5147, reformat & add more configs

jianghang.loujh 10 月之前
父節點
當前提交
b0f3ff8dcc

+ 8 - 0
deployer/src/main/resources/example/instance.properties

@@ -12,6 +12,14 @@ canal.instance.master.position=
 canal.instance.master.timestamp=
 canal.instance.master.gtid=
 
+# ssl
+#canal.instance.master.trustCertificateKeyStoreType=
+#canal.instance.master.trustCertificateKeyStoreUrl=
+#canal.instance.master.trustCertificateKeyStorePassword=
+#canal.instance.master.clientCertificateKeyStoreType=
+#canal.instance.master.clientCertificateKeyStoreUrl=
+#canal.instance.master.clientCertificateKeyStorePassword=
+
 # rds oss binlog
 canal.instance.rds.accesskey=
 canal.instance.rds.secretkey=

+ 1 - 1
deployer/src/main/resources/spring/base-instance.xml

@@ -36,4 +36,4 @@
 		<property name="secretkey" value="${canal.aliyun.secretkey:}" />
 		<property name="instanceId" value="${canal.instance.rds.instanceId:}" />
 	</bean>
-</beans>
+</beans>

+ 24 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -125,6 +125,18 @@
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
+				<property name="sslInfo">
+					<bean class="com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo">
+						<property name="sslMode" value="${canal.instance.master.sslMode:DISABLED}"/>
+						<property name="tlsVersions" value="${canal.instance.master.tlsVersions:}"/>
+						<property name="trustCertificateKeyStoreType" value="${canal.instance.master.trustCertificateKeyStoreType:}"/>
+						<property name="trustCertificateKeyStoreUrl" value="${canal.instance.master.trustCertificateKeyStoreUrl:}"/>
+						<property name="trustCertificateKeyStorePassword" value="${canal.instance.master.trustCertificateKeyStorePassword:}"/>
+						<property name="clientCertificateKeyStoreType" value="${canal.instance.master.clientCertificateKeyStoreType:}"/>
+						<property name="clientCertificateKeyStoreUrl" value="${canal.instance.master.clientCertificateKeyStoreUrl:}"/>
+						<property name="clientCertificateKeyStorePassword" value="${canal.instance.master.clientCertificateKeyStorePassword:}"/>
+					</bean>
+				</property>
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -135,6 +147,18 @@
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
+				<property name="sslInfo">
+					<bean class="com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo">
+						<property name="sslMode" value="${canal.instance.standby.sslMode:DISABLED}"/>
+						<property name="tlsVersions" value="${canal.instance.standby.tlsVersions:}"/>
+						<property name="trustCertificateKeyStoreType" value="${canal.instance.standby.trustCertificateKeyStoreType:}"/>
+						<property name="trustCertificateKeyStoreUrl" value="${canal.instance.standby.trustCertificateKeyStoreUrl:}"/>
+						<property name="trustCertificateKeyStorePassword" value="${canal.instance.standby.trustCertificateKeyStorePassword:}"/>
+						<property name="clientCertificateKeyStoreType" value="${canal.instance.standby.clientCertificateKeyStoreType:}"/>
+						<property name="clientCertificateKeyStoreUrl" value="${canal.instance.standby.clientCertificateKeyStoreUrl:}"/>
+						<property name="clientCertificateKeyStorePassword" value="${canal.instance.standby.clientCertificateKeyStorePassword:}"/>
+					</bean>
+				</property>
 			</bean>
 		</property>
 

+ 48 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -122,6 +122,18 @@
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
+				<property name="sslInfo">
+					<bean class="com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo">
+						<property name="sslMode" value="${canal.instance.master.sslMode:DISABLED}"/>
+						<property name="tlsVersions" value="${canal.instance.master.tlsVersions:}"/>
+						<property name="trustCertificateKeyStoreType" value="${canal.instance.master.trustCertificateKeyStoreType:}"/>
+						<property name="trustCertificateKeyStoreUrl" value="${canal.instance.master.trustCertificateKeyStoreUrl:}"/>
+						<property name="trustCertificateKeyStorePassword" value="${canal.instance.master.trustCertificateKeyStorePassword:}"/>
+						<property name="clientCertificateKeyStoreType" value="${canal.instance.master.clientCertificateKeyStoreType:}"/>
+						<property name="clientCertificateKeyStoreUrl" value="${canal.instance.master.clientCertificateKeyStoreUrl:}"/>
+						<property name="clientCertificateKeyStorePassword" value="${canal.instance.master.clientCertificateKeyStorePassword:}"/>
+					</bean>
+				</property>
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -132,6 +144,18 @@
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
+				<property name="sslInfo">
+					<bean class="com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo">
+						<property name="sslMode" value="${canal.instance.standby.sslMode:DISABLED}"/>
+						<property name="tlsVersions" value="${canal.instance.standby.tlsVersions:}"/>
+						<property name="trustCertificateKeyStoreType" value="${canal.instance.standby.trustCertificateKeyStoreType:}"/>
+						<property name="trustCertificateKeyStoreUrl" value="${canal.instance.standby.trustCertificateKeyStoreUrl:}"/>
+						<property name="trustCertificateKeyStorePassword" value="${canal.instance.standby.trustCertificateKeyStorePassword:}"/>
+						<property name="clientCertificateKeyStoreType" value="${canal.instance.standby.clientCertificateKeyStoreType:}"/>
+						<property name="clientCertificateKeyStoreUrl" value="${canal.instance.standby.clientCertificateKeyStoreUrl:}"/>
+						<property name="clientCertificateKeyStorePassword" value="${canal.instance.standby.clientCertificateKeyStorePassword:}"/>
+					</bean>
+				</property>
 			</bean>
 		</property>
 
@@ -229,6 +253,18 @@
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
+				<property name="sslInfo">
+					<bean class="com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo">
+						<property name="sslMode" value="${canal.instance.master.sslMode:DISABLED}"/>
+						<property name="tlsVersions" value="${canal.instance.master.tlsVersions:}"/>
+						<property name="trustCertificateKeyStoreType" value="${canal.instance.master.trustCertificateKeyStoreType:}"/>
+						<property name="trustCertificateKeyStoreUrl" value="${canal.instance.master.trustCertificateKeyStoreUrl:}"/>
+						<property name="trustCertificateKeyStorePassword" value="${canal.instance.master.trustCertificateKeyStorePassword:}"/>
+						<property name="clientCertificateKeyStoreType" value="${canal.instance.master.clientCertificateKeyStoreType:}"/>
+						<property name="clientCertificateKeyStoreUrl" value="${canal.instance.master.clientCertificateKeyStoreUrl:}"/>
+						<property name="clientCertificateKeyStorePassword" value="${canal.instance.master.clientCertificateKeyStorePassword:}"/>
+					</bean>
+				</property>
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -239,6 +275,18 @@
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
+				<property name="sslInfo">
+					<bean class="com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo">
+						<property name="sslMode" value="${canal.instance.standby.sslMode:DISABLED}"/>
+						<property name="tlsVersions" value="${canal.instance.standby.tlsVersions:}"/>
+						<property name="trustCertificateKeyStoreType" value="${canal.instance.standby.trustCertificateKeyStoreType:}"/>
+						<property name="trustCertificateKeyStoreUrl" value="${canal.instance.standby.trustCertificateKeyStoreUrl:}"/>
+						<property name="trustCertificateKeyStorePassword" value="${canal.instance.standby.trustCertificateKeyStorePassword:}"/>
+						<property name="clientCertificateKeyStoreType" value="${canal.instance.standby.clientCertificateKeyStoreType:}"/>
+						<property name="clientCertificateKeyStoreUrl" value="${canal.instance.standby.clientCertificateKeyStoreUrl:}"/>
+						<property name="clientCertificateKeyStorePassword" value="${canal.instance.standby.clientCertificateKeyStorePassword:}"/>
+					</bean>
+				</property>
 			</bean>
 		</property>
 

+ 16 - 18
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
+import static com.alibaba.otter.canal.parse.driver.mysql.packets.Capability.CLIENT_SSL;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.DigestException;
@@ -10,11 +12,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.client.AuthSwitchResponsePacket;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.client.ClientAuthenticationPacket;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.client.ClientAuthenticationSHA2Packet;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.client.QuitCommandPacket;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.client.SslRequestCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.*;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.*;
 import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannelPool;
@@ -23,7 +21,6 @@ import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslMode;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.MSC;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.MySQLPasswordEncrypter;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
-import static com.alibaba.otter.canal.parse.driver.mysql.packets.Capability.CLIENT_SSL;
 
 /**
  * 基于mysql socket协议的链接实现
@@ -37,7 +34,7 @@ public class MysqlConnector {
     private InetSocketAddress   address;
     private String              username;
     private String              password;
-    private SslInfo  sslInfo;
+    private SslInfo             sslInfo;
 
     private String              defaultSchema;
     private int                 soTimeout         = 30 * 1000;
@@ -67,15 +64,14 @@ public class MysqlConnector {
         this.password = password;
     }
 
-    public MysqlConnector(InetSocketAddress address, String username, String password,
-                          String defaultSchema){
+    public MysqlConnector(InetSocketAddress address, String username, String password, String defaultSchema){
         this(address, username, password);
 
         this.defaultSchema = defaultSchema;
     }
 
-    public MysqlConnector(InetSocketAddress address, String username, String password,
-        String defaultSchema, SslInfo sslInfo) {
+    public MysqlConnector(InetSocketAddress address, String username, String password, String defaultSchema,
+                          SslInfo sslInfo){
         this(address, username, password, defaultSchema);
         this.sslInfo = sslInfo;
     }
@@ -110,7 +106,9 @@ public class MysqlConnector {
                 sslCipher = result.getFieldValues().get(1);
             }
             logger.info("connect MysqlConnection in sslMode {}, Ssl_version:{}, Ssl_cipher:{}",
-                (sslInfo != null ? sslInfo.getSslMode() : SslMode.DISABLED), sslVersion, sslCipher);
+                (sslInfo != null ? sslInfo.getSslMode() : SslMode.DISABLED),
+                sslVersion,
+                sslCipher);
         } catch (Exception e) {
             logger.info("Can't show SSL status, server may not standard MySQL server: {}", e.toString());
             logger.debug("show SSL status exception", e);
@@ -206,21 +204,21 @@ public class MysqlConnector {
         if (sslMode != SslMode.DISABLED) {
             boolean serverSupportSsl = (handshakePacket.serverCapabilities & CLIENT_SSL) > 0;
             if (!serverSupportSsl) {
-                throw new IOException("MySQL Server does not support SSL: " + address
-                    + " serverCapabilities: " + handshakePacket.serverCapabilities);
+                throw new IOException("MySQL Server does not support SSL: " + address + " serverCapabilities: "
+                                      + handshakePacket.serverCapabilities);
             }
             byte[] sslPacket = new SslRequestCommandPacket(handshakePacket.serverCharsetNumber).toBytes();
             HeaderPacket sslHeader = new HeaderPacket();
             sslHeader.setPacketBodyLength(sslPacket.length);
-            sslHeader.setPacketSequenceNumber((byte)(header.getPacketSequenceNumber() + 1));
-            header.setPacketSequenceNumber((byte)(header.getPacketSequenceNumber() + 1));
+            sslHeader.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
+            header.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
             PacketManager.writePkg(channel, sslHeader.toBytes(), sslPacket);
             channel = SocketChannelPool.connectSsl(channel, sslInfo);
             this.channel = channel;
         }
         if (handshakePacket.protocolVersion != MSC.DEFAULT_PROTOCOL_VERSION) {
             // HandshakeV9
-            auth323(channel, (byte)(header.getPacketSequenceNumber() + 1), handshakePacket.seed);
+            auth323(channel, (byte) (header.getPacketSequenceNumber() + 1), handshakePacket.seed);
             return;
         }
 
@@ -248,7 +246,7 @@ public class MysqlConnector {
         byte[] clientAuthPkgBody = clientAuth.toBytes();
         HeaderPacket h = new HeaderPacket();
         h.setPacketBodyLength(clientAuthPkgBody.length);
-        h.setPacketSequenceNumber((byte)(header.getPacketSequenceNumber() + 1));
+        h.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
 
         PacketManager.writePkg(channel, h.toBytes(), clientAuthPkgBody);
         logger.info("client authentication packet is sent out.");

+ 6 - 6
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/SslRequestCommandPacket.java

@@ -15,7 +15,9 @@ public class SslRequestCommandPacket implements IPacket {
 
     private final int serverCharsetNumber;
 
-    public SslRequestCommandPacket(int serverCharsetNumber) {this.serverCharsetNumber = serverCharsetNumber;}
+    public SslRequestCommandPacket(int serverCharsetNumber){
+        this.serverCharsetNumber = serverCharsetNumber;
+    }
 
     @Override
     public void fromBytes(byte[] data) throws IOException {
@@ -25,11 +27,9 @@ public class SslRequestCommandPacket implements IPacket {
     @Override
     public byte[] toBytes() throws IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
-        int clientCapabilities = Capability.CLIENT_LONG_FLAG
-            | Capability.CLIENT_PROTOCOL_41
-            | Capability.CLIENT_SECURE_CONNECTION
-            | Capability.CLIENT_PLUGIN_AUTH
-            | Capability.CLIENT_SSL;
+        int clientCapabilities = Capability.CLIENT_LONG_FLAG | Capability.CLIENT_PROTOCOL_41
+                                 | Capability.CLIENT_SECURE_CONNECTION | Capability.CLIENT_PLUGIN_AUTH
+                                 | Capability.CLIENT_SSL;
         ByteHelper.writeUnsignedIntLittleEndian(clientCapabilities, out);
         ByteHelper.writeUnsignedIntLittleEndian(0, out);
         out.write(serverCharsetNumber);

+ 10 - 8
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java

@@ -19,11 +19,11 @@ import javax.net.ssl.SSLSocket;
  */
 public class BioSocketChannel implements SocketChannel {
 
-    static final int     DEFAULT_CONNECT_TIMEOUT = 10 * 1000;
-    static final int     SO_TIMEOUT              = 1000;
-    private Socket       socket;
-    private InputStream  input;
-    private OutputStream output;
+    static final int      DEFAULT_CONNECT_TIMEOUT = 10 * 1000;
+    static final int      SO_TIMEOUT              = 1000;
+    private Socket        socket;
+    private InputStream   input;
+    private OutputStream  output;
     private final boolean ssl;
 
     BioSocketChannel(Socket socket) throws IOException{
@@ -92,9 +92,9 @@ public class BioSocketChannel implements SocketChannel {
             }
         }
         if (remain > 0 && accTimeout >= timeout) {
-            throw new SocketTimeoutException("Timeout occurred, failed to read total " + readSize + " bytes in "
-                                             + timeout + " milliseconds, actual read only " + (readSize - remain)
-                                             + " bytes");
+            throw new SocketTimeoutException(
+                "Timeout occurred, failed to read total " + readSize + " bytes in " + timeout
+                                             + " milliseconds, actual read only " + (readSize - remain) + " bytes");
         }
         return data;
     }
@@ -137,9 +137,11 @@ public class BioSocketChannel implements SocketChannel {
         }
         return false;
     }
+
     public boolean isSsl() {
         return ssl;
     }
+
     public Socket getSocket() {
         return socket;
     }

+ 59 - 57
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannelPool.java

@@ -8,11 +8,7 @@ import java.net.URL;
 import java.security.InvalidAlgorithmParameterException;
 import java.security.KeyStore;
 import java.security.cert.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 
 import javax.naming.InvalidNameException;
@@ -21,18 +17,19 @@ import javax.naming.ldap.Rdn;
 import javax.net.ssl.*;
 import javax.security.auth.x500.X500Principal;
 
-import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo;
-import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslMode;
-
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo;
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslMode;
+
 /**
  * @author luoyaogui 实现channel的管理(监听连接、读数据、回收) 2016-12-28
  * @author chuanyi 2018-3-3 保留<code>open</code>减少文件变更数量
  */
 public abstract class BioSocketChannelPool {
+
     private static final Logger logger = LoggerFactory.getLogger(BioSocketChannelPool.class);
 
     public static BioSocketChannel open(SocketAddress address) throws Exception {
@@ -42,7 +39,6 @@ public abstract class BioSocketChannelPool {
 
     public static BioSocketChannel openSsl(Socket socket, SslInfo sslInfo) throws Exception {
         SslMode sslMode = sslInfo.getSslMode();
-
         switch (sslMode) {
             case REQUIRED:
             case PREFERRED:
@@ -67,9 +63,7 @@ public abstract class BioSocketChannelPool {
     }
 
     /**
-     * from JDBC driver
-     *
-     * com.mysql.cj.protocol.ExportControlled#performTlsHandshake
+     * from JDBC driver com.mysql.cj.protocol.ExportControlled#performTlsHandshake
      * com.mysql.cj.protocol.ExportControlled#getSSLContext
      *
      * @param socket
@@ -82,14 +76,12 @@ public abstract class BioSocketChannelPool {
         boolean verifyServerCert = sslMode == SslMode.VERIFY_CA || sslMode == SslMode.VERIFY_IDENTITY;
 
         String clientCertificateKeyStoreUrl = sslInfo.getClientCertificateKeyStoreUrl();
-        String clientCertificateKeyStoreType = sslInfo.getClientCertificateKeyStoreType() != null
-            ? sslInfo.getClientCertificateKeyStoreType()
-            : "JKS";
+        String clientCertificateKeyStoreType = sslInfo.getClientCertificateKeyStoreType() != null ? sslInfo
+            .getClientCertificateKeyStoreType() : "JKS";
         String clientCertificateKeyStorePassword = sslInfo.getClientCertificateKeyStorePassword();
         String trustCertificateKeyStoreUrl = sslInfo.getTrustCertificateKeyStoreUrl();
-        String trustCertificateKeyStoreType = sslInfo.getTrustCertificateKeyStoreType() != null
-            ? sslInfo.getTrustCertificateKeyStoreType()
-            : "JKS";
+        String trustCertificateKeyStoreType = sslInfo.getTrustCertificateKeyStoreType() != null ? sslInfo
+            .getTrustCertificateKeyStoreType() : "JKS";
         String trustCertificateKeyStorePassword = sslInfo.getTrustCertificateKeyStorePassword();
         boolean fallbackToDefaultTrustStore = true;
         String hostName = sslMode == SslMode.VERIFY_IDENTITY ? socket.getInetAddress().getHostName() : null;
@@ -97,36 +89,38 @@ public abstract class BioSocketChannelPool {
         SSLContext sslContext = getSSLContext(clientCertificateKeyStoreUrl,
             clientCertificateKeyStoreType,
             clientCertificateKeyStorePassword,
-            trustCertificateKeyStoreUrl, trustCertificateKeyStoreType,
+            trustCertificateKeyStoreUrl,
+            trustCertificateKeyStoreType,
             trustCertificateKeyStorePassword,
             fallbackToDefaultTrustStore,
-            verifyServerCert, hostName);
+            verifyServerCert,
+            hostName);
         SSLSocketFactory socketFactory = sslContext.getSocketFactory();
 
-        SSLSocket sslSocket = (SSLSocket)socketFactory.createSocket(
-            socket, socket.getInetAddress().getHostName(), socket.getPort(), true);
+        SSLSocket sslSocket = (SSLSocket) socketFactory
+            .createSocket(socket, socket.getInetAddress().getHostName(), socket.getPort(), true);
 
         String[] protocolArr = null;
         if (StringUtils.isNotEmpty(sslInfo.getTlsVersions())) {
             protocolArr = StringUtils.split(sslInfo.getTlsVersions(), ",");
         }
         if (protocolArr == null || protocolArr.length == 0) {
-            protocolArr = new String[] {"TLSv1.2", "TLSv1.3"};
+            protocolArr = new String[] { "TLSv1.2", "TLSv1.3" };
         }
         logger.info("SSL protocol: {}", StringUtils.join(protocolArr, ","));
         sslSocket.setEnabledProtocols(protocolArr);
-        //sslSocket.setEnabledCipherSuites("TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDH_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDH_RSA_WITH_AES_256_GCM_SHA384,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_DHE_DSS_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_DSS_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384,TLS_RSA_WITH_AES_256_CBC_SHA256,TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA384,TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384,TLS_DHE_RSA_WITH_AES_256_CBC_SHA256,TLS_DHE_DSS_WITH_AES_256_CBC_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA,TLS_ECDH_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,TLS_RSA_WITH_AES_128_CBC_SHA256,TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256,TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_DSS_WITH_AES_128_CBC_SHA256,TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA,TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA,TLS_ECDH_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_DSS_WITH_AES_128_CBC_SHA".split(","));
+        // sslSocket.setEnabledCipherSuites("TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDH_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDH_RSA_WITH_AES_256_GCM_SHA384,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_DHE_DSS_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_DSS_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384,TLS_RSA_WITH_AES_256_CBC_SHA256,TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA384,TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384,TLS_DHE_RSA_WITH_AES_256_CBC_SHA256,TLS_DHE_DSS_WITH_AES_256_CBC_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA,TLS_ECDH_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,TLS_RSA_WITH_AES_128_CBC_SHA256,TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256,TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_DSS_WITH_AES_128_CBC_SHA256,TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA,TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA,TLS_ECDH_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_DSS_WITH_AES_128_CBC_SHA".split(","));
         sslSocket.startHandshake();
         logger.info("SSL socket handshake success.");
         return sslSocket;
     }
 
     private static SSLContext getSSLContext(String clientCertificateKeyStoreUrl, String clientCertificateKeyStoreType,
-        String clientCertificateKeyStorePassword,
-        String trustCertificateKeyStoreUrl, String trustCertificateKeyStoreType,
-        String trustCertificateKeyStorePassword,
-        boolean fallbackToDefaultTrustStore, boolean verifyServerCert, String hostName)
-        throws Exception {
+                                            String clientCertificateKeyStorePassword,
+                                            String trustCertificateKeyStoreUrl, String trustCertificateKeyStoreType,
+                                            String trustCertificateKeyStorePassword,
+                                            boolean fallbackToDefaultTrustStore, boolean verifyServerCert,
+                                            String hostName) throws Exception {
 
         KeyManager[] kms = null;
         List<TrustManager> tms = new ArrayList<>();
@@ -140,8 +134,8 @@ public abstract class BioSocketChannelPool {
                 if (StringUtils.isNotEmpty(clientCertificateKeyStoreType)) {
                     KeyStore clientKeyStore = KeyStore.getInstance(clientCertificateKeyStoreType);
                     URL ksURL = new URL(clientCertificateKeyStoreUrl);
-                    char[] password = (clientCertificateKeyStorePassword == null) ? new char[0]
-                        : clientCertificateKeyStorePassword.toCharArray();
+                    char[] password = (clientCertificateKeyStorePassword == null) ? new char[0] : clientCertificateKeyStorePassword
+                        .toCharArray();
                     ksIS = ksURL.openStream();
                     clientKeyStore.load(ksIS, password);
                     kmf.init(clientKeyStore, password);
@@ -152,7 +146,8 @@ public abstract class BioSocketChannelPool {
                     try {
                         ksIS.close();
                     } catch (IOException e) {
-                        // can't close input stream, but keystore can be properly initialized so we shouldn't throw
+                        // can't close input stream, but keystore can be properly initialized so we
+                        // shouldn't throw
                         // this exception
                     }
                 }
@@ -165,11 +160,11 @@ public abstract class BioSocketChannelPool {
             char[] trustStorePassword = null;
             KeyStore trustKeyStore = null;
 
-            if (StringUtils.isNotEmpty(trustCertificateKeyStoreUrl) && StringUtils.isNotEmpty(
-                trustCertificateKeyStoreType)) {
+            if (StringUtils.isNotEmpty(trustCertificateKeyStoreUrl)
+                && StringUtils.isNotEmpty(trustCertificateKeyStoreType)) {
                 trustStoreType = trustCertificateKeyStoreType;
-                trustStorePassword = (trustCertificateKeyStorePassword == null) ? new char[0]
-                    : trustCertificateKeyStorePassword.toCharArray();
+                trustStorePassword = (trustCertificateKeyStorePassword == null) ? new char[0] : trustCertificateKeyStorePassword
+                    .toCharArray();
                 trustStoreIS = new URL(trustCertificateKeyStoreUrl).openStream();
 
                 trustKeyStore = KeyStore.getInstance(trustStoreType);
@@ -178,15 +173,18 @@ public abstract class BioSocketChannelPool {
 
             if (trustKeyStore != null || fallbackToDefaultTrustStore) {
                 tmf.init(trustKeyStore);
-                // (trustKeyStore == null) initializes the TrustManagerFactory with the default truststore.
+                // (trustKeyStore == null) initializes the TrustManagerFactory with the default
+                // truststore.
 
-                // building the customized list of TrustManagers from original one if it's available
+                // building the customized list of TrustManagers from original one if it's
+                // available
                 TrustManager[] origTms = tmf.getTrustManagers();
 
                 for (TrustManager tm : origTms) {
                     // wrap X509TrustManager or put original if non-X509 TrustManager
-                    tms.add(tm instanceof X509TrustManager ? new X509TrustManagerWrapper((X509TrustManager)tm,
-                        verifyServerCert, hostName) : tm);
+                    tms.add(tm instanceof X509TrustManager ? new X509TrustManagerWrapper((X509TrustManager) tm,
+                        verifyServerCert,
+                        hostName) : tm);
                 }
             }
 
@@ -195,13 +193,15 @@ public abstract class BioSocketChannelPool {
                 try {
                     trustStoreIS.close();
                 } catch (IOException e) {
-                    // can't close input stream, but keystore can be properly initialized so we shouldn't throw this
+                    // can't close input stream, but keystore can be properly initialized so we
+                    // shouldn't throw this
                     // exception
                 }
             }
         }
 
-        // if original TrustManagers are not available then putting one X509TrustManagerWrapper which take care only
+        // if original TrustManagers are not available then putting one
+        // X509TrustManagerWrapper which take care only
         // about expiration check
         if (tms.size() == 0) {
             tms.add(new X509TrustManagerWrapper(verifyServerCert, hostName));
@@ -214,24 +214,24 @@ public abstract class BioSocketChannelPool {
 
     public static class X509TrustManagerWrapper implements X509TrustManager {
 
-        private X509TrustManager origTm = null;
-        private boolean verifyServerCert = false;
-        private String hostName = null;
-        private CertificateFactory certFactory = null;
-        private PKIXParameters validatorParams = null;
-        private CertPathValidator validator = null;
+        private X509TrustManager   origTm           = null;
+        private boolean            verifyServerCert = false;
+        private String             hostName         = null;
+        private CertificateFactory certFactory      = null;
+        private PKIXParameters     validatorParams  = null;
+        private CertPathValidator  validator        = null;
 
-        public X509TrustManagerWrapper(X509TrustManager tm, boolean verifyServerCertificate, String hostName)
-            throws CertificateException {
+        public X509TrustManagerWrapper(X509TrustManager tm, boolean verifyServerCertificate,
+                                       String hostName) throws CertificateException{
             this.origTm = tm;
             this.verifyServerCert = verifyServerCertificate;
             this.hostName = hostName;
 
             if (verifyServerCertificate) {
                 try {
-                    Set<TrustAnchor> anch = Arrays.stream(tm.getAcceptedIssuers()).map(c -> new TrustAnchor(c, null))
-                        .collect(
-                            Collectors.toSet());
+                    Set<TrustAnchor> anch = Arrays.stream(tm.getAcceptedIssuers())
+                        .map(c -> new TrustAnchor(c, null))
+                        .collect(Collectors.toSet());
                     this.validatorParams = new PKIXParameters(anch);
                     this.validatorParams.setRevocationEnabled(false);
                     this.validator = CertPathValidator.getInstance("PKIX");
@@ -243,7 +243,7 @@ public abstract class BioSocketChannelPool {
 
         }
 
-        public X509TrustManagerWrapper(boolean verifyServerCertificate, String hostName) {
+        public X509TrustManagerWrapper(boolean verifyServerCertificate, String hostName){
             this.verifyServerCert = verifyServerCertificate;
             this.hostName = hostName;
         }
@@ -266,7 +266,7 @@ public abstract class BioSocketChannelPool {
                     // Validate against truststore
                     CertPathValidatorResult result = this.validator.validate(certPath, this.validatorParams);
                     // Check expiration for the CA used to validate this path
-                    ((PKIXCertPathValidatorResult)result).getTrustAnchor().getTrustedCert().checkValidity();
+                    ((PKIXCertPathValidatorResult) result).getTrustAnchor().getTrustedCert().checkValidity();
 
                 } catch (InvalidAlgorithmParameterException e) {
                     throw new CertificateException(e);
@@ -308,8 +308,10 @@ public abstract class BioSocketChannelPool {
                     if (!expectHostNames.contains(this.hostName)) {
                         throw new CertificateException(
                             "Server certificate identity check failed. The certificate Common Name "
-                                + expectHostNames.stream().map(h -> "'" + h + "'").collect(Collectors.joining(", "))
-                                + " does not match with '" + this.hostName + "'.");
+                                                       + expectHostNames.stream()
+                                                           .map(h -> "'" + h + "'")
+                                                           .collect(Collectors.joining(", "))
+                                                       + " does not match with '" + this.hostName + "'.");
                     }
 
                 }

+ 6 - 6
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java

@@ -3,18 +3,19 @@ package com.alibaba.otter.canal.parse.driver.mysql.socket;
 import java.io.IOException;
 import java.net.SocketAddress;
 
-import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo;
-import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslMode;
-
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo;
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslMode;
+
 /**
  * @author agapple 2018年3月12日 下午10:46:22
  * @since 1.0.26
  */
 public abstract class SocketChannelPool {
+
     private static final Logger logger = LoggerFactory.getLogger(SocketChannelPool.class);
 
     public static SocketChannel open(SocketAddress address) throws Exception {
@@ -35,8 +36,7 @@ public abstract class SocketChannelPool {
         } else {
             SocketAddress remoteSocketAddress = channel.getRemoteSocketAddress();
             try {
-                return BioSocketChannelPool.openSsl(
-                    ((BioSocketChannel)channel).getSocket(), sslInfo);
+                return BioSocketChannelPool.openSsl(((BioSocketChannel) channel).getSocket(), sslInfo);
             } catch (Exception e) {
                 if (sslMode == SslMode.PREFERRED) {
                     // still use non ssl channel
@@ -45,7 +45,7 @@ public abstract class SocketChannelPool {
                 }
                 IOException ioe;
                 if (e instanceof IOException) {
-                    ioe = (IOException)e;
+                    ioe = (IOException) e;
                 } else {
                     ioe = new IOException(e);
                 }

+ 10 - 10
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/ssl/SslInfo.java

@@ -7,18 +7,18 @@ package com.alibaba.otter.canal.parse.driver.mysql.ssl;
 public class SslInfo {
 
     private SslMode sslMode = SslMode.DISABLED;
-    private String tlsVersions; // 和 enabledTLSProtocols 同含义,TLSv1.2,TLSv1.3
-    private String trustCertificateKeyStoreType; // trustStore 证书类型,支持 JKS (默认) 和 PKCS12
-    private String trustCertificateKeyStoreUrl; // trustStore 证书
-    private String trustCertificateKeyStorePassword; // trustStore 证书密码
-    private String clientCertificateKeyStoreType; // client 证书类型,支持 JKS (默认) 和 PKCS12
-    private String clientCertificateKeyStoreUrl; // client 证书
-    private String clientCertificateKeyStorePassword; // client 证书密码
+    private String  tlsVersions;                       // 和 enabledTLSProtocols 同含义,TLSv1.2,TLSv1.3
+    private String  trustCertificateKeyStoreType;      // trustStore 证书类型,支持 JKS (默认) 和 PKCS12
+    private String  trustCertificateKeyStoreUrl;       // trustStore 证书
+    private String  trustCertificateKeyStorePassword;  // trustStore 证书密码
+    private String  clientCertificateKeyStoreType;     // client 证书类型,支持 JKS (默认) 和 PKCS12
+    private String  clientCertificateKeyStoreUrl;      // client 证书
+    private String  clientCertificateKeyStorePassword; // client 证书密码
 
     public SslInfo(SslMode sslMode, String tlsVersions, String trustCertificateKeyStoreType,
-        String trustCertificateKeyStoreUrl, String trustCertificateKeyStorePassword,
-        String clientCertificateKeyStoreType,
-        String clientCertificateKeyStoreUrl, String clientCertificateKeyStorePassword) {
+                   String trustCertificateKeyStoreUrl, String trustCertificateKeyStorePassword,
+                   String clientCertificateKeyStoreType, String clientCertificateKeyStoreUrl,
+                   String clientCertificateKeyStorePassword) {
         this.sslMode = sslMode;
         this.tlsVersions = tlsVersions;
         this.trustCertificateKeyStoreType = trustCertificateKeyStoreType;

+ 58 - 34
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -66,14 +66,20 @@ public class CanalParameter implements Serializable {
     private String                   dbUsername;                                                     // 数据库用户
     private String                   dbPassword;                                                     // 数据库密码
 
-    private String sslMode = SslMode.DISABLED.name();
-    private String tlsVersions; // 和 enabledTLSProtocols 同含义,TLSv1.2,TLSv1.3
-    private String trustCertificateKeyStoreType; // trustStore 证书类型,支持 JKS (默认) 和 PKCS12
-    private String trustCertificateKeyStoreUrl; // trustStore 证书路径
-    private String trustCertificateKeyStorePassword; // trustStore 证书密码
-    private String clientCertificateKeyStoreType; // client 证书类型,支持 JKS (默认) 和 PKCS12
-    private String clientCertificateKeyStoreUrl; // client 证书路径
-    private String clientCertificateKeyStorePassword; // client 证书密码
+    private String                   sslMode                            = SslMode.DISABLED.name();
+    private String                   tlsVersions;                                                    // 和
+                                                                                                     // enabledTLSProtocols
+                                                                                                     // 同含义,TLSv1.2,TLSv1.3
+    private String                   trustCertificateKeyStoreType;                                   // trustStore
+                                                                                                     // 证书类型,支持 JKS (默认)
+                                                                                                     // 和 PKCS12
+    private String                   trustCertificateKeyStoreUrl;                                    // trustStore 证书路径
+    private String                   trustCertificateKeyStorePassword;                               // trustStore 证书密码
+    private String                   clientCertificateKeyStoreType;                                  // client 证书类型,支持
+                                                                                                     // JKS (默认) 和
+                                                                                                     // PKCS12
+    private String                   clientCertificateKeyStoreUrl;                                   // client 证书路径
+    private String                   clientCertificateKeyStorePassword;                              // client 证书密码
 
     // binlog链接信息
     private IndexMode                indexMode;
@@ -117,26 +123,38 @@ public class CanalParameter implements Serializable {
     private InetSocketAddress        masterAddress;                                                  // 主库信息
     private String                   masterUsername;                                                 // 帐号
     private String                   masterPassword;                                                 // 密码
-    private String masterSslMode = SslMode.DISABLED.name();
-    private String masterTlsVersions; // 和 enabledTLSProtocols 同含义,TLSv1.2,TLSv1.3
-    private String masterTrustCertificateKeyStoreType; // trustStore 证书类型,支持 JKS (默认) 和 PKCS12
-    private String masterTrustCertificateKeyStoreUrl; // trustStore 证书路径
-    private String masterTrustCertificateKeyStorePassword; // trustStore 证书密码
-    private String masterClientCertificateKeyStoreType; // client 证书类型,支持 JKS (默认) 和 PKCS12
-    private String masterClientCertificateKeyStoreUrl; // client 证书路径
-    private String masterClientCertificateKeyStorePassword; // client 证书密码
+    private String                   masterSslMode                      = SslMode.DISABLED.name();
+    private String                   masterTlsVersions;                                              // 和
+                                                                                                     // enabledTLSProtocols
+                                                                                                     // 同含义,TLSv1.2,TLSv1.3
+    private String                   masterTrustCertificateKeyStoreType;                             // trustStore
+                                                                                                     // 证书类型,支持 JKS (默认)
+                                                                                                     // 和 PKCS12
+    private String                   masterTrustCertificateKeyStoreUrl;                              // trustStore 证书路径
+    private String                   masterTrustCertificateKeyStorePassword;                         // trustStore 证书密码
+    private String                   masterClientCertificateKeyStoreType;                            // client 证书类型,支持
+                                                                                                     // JKS (默认) 和
+                                                                                                     // PKCS12
+    private String                   masterClientCertificateKeyStoreUrl;                             // client 证书路径
+    private String                   masterClientCertificateKeyStorePassword;                        // client 证书密码
 
     private InetSocketAddress        standbyAddress;                                                 // 备库信息
     private String                   standbyUsername;                                                // 帐号
     private String                   standbyPassword;
-    private String standbySslMode = SslMode.DISABLED.name();
-    private String standbyTlsVersions; // 和 enabledTLSProtocols 同含义,TLSv1.2,TLSv1.3
-    private String standbyTrustCertificateKeyStoreType; // trustStore 证书类型,支持 JKS (默认) 和 PKCS12
-    private String standbyTrustCertificateKeyStoreUrl; // trustStore 证书路径
-    private String standbyTrustCertificateKeyStorePassword; // trustStore 证书密码
-    private String standbyClientCertificateKeyStoreType; // client 证书类型,支持 JKS (默认) 和 PKCS12
-    private String standbyClientCertificateKeyStoreUrl; // client 证书路径
-    private String standbyClientCertificateKeyStorePassword; // client 证书密码
+    private String                   standbySslMode                     = SslMode.DISABLED.name();
+    private String                   standbyTlsVersions;                                             // 和
+                                                                                                     // enabledTLSProtocols
+                                                                                                     // 同含义,TLSv1.2,TLSv1.3
+    private String                   standbyTrustCertificateKeyStoreType;                            // trustStore
+                                                                                                     // 证书类型,支持 JKS (默认)
+                                                                                                     // 和 PKCS12
+    private String                   standbyTrustCertificateKeyStoreUrl;                             // trustStore 证书路径
+    private String                   standbyTrustCertificateKeyStorePassword;                        // trustStore 证书密码
+    private String                   standbyClientCertificateKeyStoreType;                           // client 证书类型,支持
+                                                                                                     // JKS (默认) 和
+                                                                                                     // PKCS12
+    private String                   standbyClientCertificateKeyStoreUrl;                            // client 证书路径
+    private String                   standbyClientCertificateKeyStorePassword;                       // client 证书密码
 
     private String                   masterLogfileName                  = null;                      // master起始位置
     private Long                     masterLogfileOffest                = null;
@@ -146,11 +164,11 @@ public class CanalParameter implements Serializable {
     private Long                     standbyTimestamp                   = null;
     private Boolean                  parallel                           = Boolean.FALSE;
 
-    //自定义alarmHandler类全路径
+    // 自定义alarmHandler类全路径
     private String                   alarmHandlerClass                  = null;
-    //自定义alarmHandler插件文件夹路径
+    // 自定义alarmHandler插件文件夹路径
     private String                   alarmHandlerPluginDir              = null;
-    //是否支持多流消费
+    // 是否支持多流消费
     private Boolean                  multiStreamEnable                  = Boolean.FALSE;
 
     public static enum RunMode {
@@ -209,6 +227,7 @@ public class CanalParameter implements Serializable {
     }
 
     public static enum StorageMode {
+
         /** 内存存储模式 */
         MEMORY,
         /** 文件存储模式 */
@@ -231,6 +250,7 @@ public class CanalParameter implements Serializable {
     }
 
     public static enum StorageScavengeMode {
+
         /** 在存储满的时候触发 */
         ON_FULL,
         /** 在每次有ack请求时触发 */
@@ -258,6 +278,7 @@ public class CanalParameter implements Serializable {
     }
 
     public static enum SourcingType {
+
         /** mysql DB */
         MYSQL,
         /** localBinLog */
@@ -285,6 +306,7 @@ public class CanalParameter implements Serializable {
     }
 
     public static enum MetaMode {
+
         /** 内存存储模式 */
         MEMORY,
         /** 文件存储模式 */
@@ -312,6 +334,7 @@ public class CanalParameter implements Serializable {
     }
 
     public static enum IndexMode {
+
         /** 内存存储模式 */
         MEMORY,
         /** 文件存储模式 */
@@ -345,6 +368,7 @@ public class CanalParameter implements Serializable {
     }
 
     public static enum BatchMode {
+
         /** 对象数量 */
         ITEMSIZE,
 
@@ -703,16 +727,16 @@ public class CanalParameter implements Serializable {
                     groupDbAddresses.add(groupAddresses);
                 }
             } else {
-            if (masterAddress != null) {
-                List<DataSourcing> groupAddresses = new ArrayList<>();
+                if (masterAddress != null) {
+                    List<DataSourcing> groupAddresses = new ArrayList<>();
                     groupAddresses.add(new DataSourcing(sourcingType, masterAddress));
-                groupDbAddresses.add(groupAddresses);
-            }
+                    groupDbAddresses.add(groupAddresses);
+                }
 
-            if (standbyAddress != null) {
-                List<DataSourcing> groupAddresses = new ArrayList<>();
+                if (standbyAddress != null) {
+                    List<DataSourcing> groupAddresses = new ArrayList<>();
                     groupAddresses.add(new DataSourcing(sourcingType, standbyAddress));
-                groupDbAddresses.add(groupAddresses);
+                    groupDbAddresses.add(groupAddresses);
                 }
             }
         }

+ 3 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -74,8 +74,7 @@ public class MysqlConnection implements ErosaConnection {
         connector.setConnTimeout(connTimeout);
     }
 
-    public MysqlConnection(InetSocketAddress address, String username, String password,
-        String defaultSchema){
+    public MysqlConnection(InetSocketAddress address, String username, String password, String defaultSchema){
         authInfo = new AuthenticationInfo();
         authInfo.setAddress(address);
         authInfo.setUsername(username);
@@ -87,8 +86,8 @@ public class MysqlConnection implements ErosaConnection {
         connector.setConnTimeout(connTimeout);
     }
 
-    public MysqlConnection(InetSocketAddress address, String username, String password,
-                           String defaultSchema, SslInfo sslInfo){
+    public MysqlConnection(InetSocketAddress address, String username, String password, String defaultSchema,
+                           SslInfo sslInfo){
         authInfo = new AuthenticationInfo();
         authInfo.setAddress(address);
         authInfo.setUsername(username);

+ 9 - 11
parse/src/main/java/com/alibaba/otter/canal/parse/support/AuthenticationInfo.java

@@ -2,12 +2,12 @@ package com.alibaba.otter.canal.parse.support;
 
 import java.net.InetSocketAddress;
 
-import com.alibaba.otter.canal.common.utils.CommonUtils;
-import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo;
-
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 
+import com.alibaba.otter.canal.common.utils.CommonUtils;
+import com.alibaba.otter.canal.parse.driver.mysql.ssl.SslInfo;
+
 /**
  * 数据库认证信息
  *
@@ -16,17 +16,15 @@ import org.apache.commons.lang.builder.ToStringStyle;
  */
 public class AuthenticationInfo {
 
-
-
     private InetSocketAddress address;            // 主库信息
     private String            username;           // 帐号
     private String            password;           // 密码
     private String            defaultDatabaseName;// 默认链接的数据库
-    private String            pwdPublicKey;       //公钥
-    private boolean           enableDruid;        //是否使用druid加密解密数据库密码
-    private SslInfo sslInfo;
+    private String            pwdPublicKey;       // 公钥
+    private boolean           enableDruid;        // 是否使用druid加密解密数据库密码
+    private SslInfo           sslInfo;
 
-    public void initPwd() throws Exception{
+    public void initPwd() throws Exception {
         if (enableDruid) {
             this.password = CommonUtils.decryptDruidPassword(pwdPublicKey, password);
         }
@@ -47,8 +45,8 @@ public class AuthenticationInfo {
         this.defaultDatabaseName = defaultDatabaseName;
     }
 
-    public AuthenticationInfo(InetSocketAddress address, String username, String password,
-        String defaultDatabaseName, SslInfo sslInfo) {
+    public AuthenticationInfo(InetSocketAddress address, String username, String password, String defaultDatabaseName,
+                              SslInfo sslInfo){
         this(address, username, password, defaultDatabaseName);
         this.sslInfo = sslInfo;
     }