Просмотр исходного кода

fixed format & support Mysql Auth Switch Protocol

agapple 6 лет назад
Родитель
Сommit
0ec4699149
25 измененных файлов с 469 добавлено и 80 удалено
  1. 6 2
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java
  2. 10 2
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SqlParser.java
  3. 5 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java
  4. 1 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  5. 7 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java
  6. 5 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSub2Test.java
  7. 5 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSubTest.java
  8. 5 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOne2Test.java
  9. 5 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java
  10. 5 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java
  11. 5 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java
  12. 5 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  13. 12 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java
  14. 5 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java
  15. 8 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java
  16. 6 1
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java
  17. 53 6
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java
  18. 170 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/Capability.java
  19. 21 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/AuthSwitchResponsePacket.java
  20. 26 7
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/ClientAuthenticationPacket.java
  21. 25 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/server/AuthSwitchRequestMoreData.java
  22. 29 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/server/AuthSwitchRequestPacket.java
  23. 36 14
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/server/HandshakeInitializationPacket.java
  24. 4 29
      driver/src/test/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnectorTest.java
  25. 10 5
      server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

+ 6 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -17,6 +17,7 @@ import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.MDC;
 
 import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -29,8 +30,11 @@ import com.alibaba.otter.canal.client.adapter.es.monitor.ESConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.es.service.ESEtlService;
 import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
-import com.alibaba.otter.canal.client.adapter.support.*;
-import org.slf4j.MDC;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
 
 /**
  * ES外部适配器

+ 10 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SqlParser.java

@@ -9,8 +9,16 @@ import java.util.stream.Collectors;
 
 import com.alibaba.fastsql.sql.SQLUtils;
 import com.alibaba.fastsql.sql.ast.SQLExpr;
-import com.alibaba.fastsql.sql.ast.expr.*;
-import com.alibaba.fastsql.sql.ast.statement.*;
+import com.alibaba.fastsql.sql.ast.expr.SQLBinaryOpExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLCaseExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLIdentifierExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLMethodInvokeExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLPropertyExpr;
+import com.alibaba.fastsql.sql.ast.statement.SQLExprTableSource;
+import com.alibaba.fastsql.sql.ast.statement.SQLJoinTableSource;
+import com.alibaba.fastsql.sql.ast.statement.SQLSelectStatement;
+import com.alibaba.fastsql.sql.ast.statement.SQLSubqueryTableSource;
+import com.alibaba.fastsql.sql.ast.statement.SQLTableSource;
 import com.alibaba.fastsql.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
 import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlStatementParser;
 import com.alibaba.fastsql.sql.parser.ParserException;

+ 5 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -1,7 +1,11 @@
 package com.alibaba.otter.canal.client.adapter.es.service;
 
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;

+ 1 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -7,7 +7,6 @@ import java.util.Map;
 
 import javax.sql.DataSource;
 
-import com.alibaba.otter.canal.client.adapter.support.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,6 +22,7 @@ import com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
  * ES 同步 Service

+ 7 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java

@@ -5,7 +5,13 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.sql.Blob;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.codec.binary.Base64;
 import org.joda.time.DateTime;

+ 5 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSub2Test.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 

+ 5 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSubTest.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 

+ 5 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOne2Test.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 

+ 5 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 

+ 5 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 

+ 5 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.elasticsearch.action.get.GetResponse;
 import org.junit.Assert;

+ 5 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -2,7 +2,11 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;

+ 12 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -1,13 +1,23 @@
 package com.alibaba.otter.canal.adapter.launcher.rest;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
 
 import com.alibaba.otter.canal.adapter.launcher.common.EtlLock;
 import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;

+ 5 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.rdb.service;
 
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;

+ 8 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -4,7 +4,14 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
-import java.sql.*;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;

+ 6 - 1
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java

@@ -1,6 +1,11 @@
 package com.alibaba.otter.canal.client.adapter.rdb.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.junit.Before;
 import org.junit.Test;

+ 53 - 6
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -2,19 +2,24 @@ package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.AuthSwitchResponsePacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.ClientAuthenticationPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.QuitCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.AuthSwitchRequestMoreData;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.AuthSwitchRequestPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.HandshakeInitializationPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.Reply323Packet;
 import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannelPool;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.MSC;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.MySQLPasswordEncrypter;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 
@@ -150,6 +155,7 @@ public class MysqlConnector {
     }
 
     private void negotiate(SocketChannel channel) throws IOException {
+        // https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol
         HeaderPacket header = PacketManager.readHeader(channel, 4, timeout);
         byte[] body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
         if (body[0] < 0) {// check field_count
@@ -165,10 +171,14 @@ public class MysqlConnector {
         }
         HandshakeInitializationPacket handshakePacket = new HandshakeInitializationPacket();
         handshakePacket.fromBytes(body);
-        connectionId = handshakePacket.threadId; // 记录一下connection
+        if (handshakePacket.protocolVersion != MSC.DEFAULT_PROTOCOL_VERSION) {
+            // HandshakeV9
+            auth323(channel, (byte) (header.getPacketSequenceNumber() + 1), handshakePacket.seed);
+            return;
+        }
 
+        connectionId = handshakePacket.threadId; // 记录一下connection
         logger.info("handshake initialization packet received, prepare the client authentication packet to send");
-
         ClientAuthenticationPacket clientAuth = new ClientAuthenticationPacket();
         clientAuth.setCharsetNumber(charsetNumber);
 
@@ -177,6 +187,7 @@ public class MysqlConnector {
         clientAuth.setServerCapabilities(handshakePacket.serverCapabilities);
         clientAuth.setDatabaseName(defaultSchema);
         clientAuth.setScrumbleBuff(joinAndCreateScrumbleBuff(handshakePacket));
+        clientAuth.setAuthPluginName(handshakePacket.authPluginName);
 
         byte[] clientAuthPkgBody = clientAuth.toBytes();
         HeaderPacket h = new HeaderPacket();
@@ -192,15 +203,51 @@ public class MysqlConnector {
         body = null;
         body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
         assert body != null;
+        byte marker = body[0];
+        if (marker == -2 || marker == 1) {
+            byte[] authData = null;
+            String pluginName = null;
+            if (marker == 1) {
+                AuthSwitchRequestMoreData packet = new AuthSwitchRequestMoreData();
+                packet.fromBytes(body);
+                authData = packet.authData;
+            } else {
+                AuthSwitchRequestPacket packet = new AuthSwitchRequestPacket();
+                packet.fromBytes(body);
+                authData = packet.authData;
+                pluginName = packet.authName;
+            }
+
+            if (pluginName != null && "mysql_native_password".equals(pluginName)) {
+                byte[] encryptedPassword = null;
+                try {
+                    encryptedPassword = MySQLPasswordEncrypter.scramble411(getPassword().getBytes(), authData);
+                } catch (NoSuchAlgorithmException e) {
+                    throw new RuntimeException("can't encrypt password that will be sent to MySQL server.", e);
+                }
+                AuthSwitchResponsePacket responsePacket = new AuthSwitchResponsePacket();
+                responsePacket.authData = encryptedPassword;
+                byte[] auth = responsePacket.toBytes();
+
+                h = new HeaderPacket();
+                h.setPacketBodyLength(auth.length);
+                h.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
+                PacketManager.writePkg(channel, h.toBytes(), auth);
+                logger.info("auth switch response packet is sent out.");
+
+                header = null;
+                header = PacketManager.readHeader(channel, 4);
+                body = null;
+                body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
+                assert body != null;
+            }
+        }
+
         if (body[0] < 0) {
             if (body[0] == -1) {
                 ErrorPacket err = new ErrorPacket();
                 err.fromBytes(body);
                 throw new IOException("Error When doing Client Authentication:" + err.toString());
-            } else if (body[0] == -2) {
-                auth323(channel, header.getPacketSequenceNumber(), handshakePacket.seed);
-                // throw new
-                // IOException("Unexpected EOF packet at Client Authentication.");
             } else {
                 throw new IOException("unpexpected packet with field_count=" + body[0]);
             }

+ 170 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/Capability.java

@@ -0,0 +1,170 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets;
+
+/**
+ * https://dev.mysql.com/doc/internals/en/capability-flags.html#packet-Protocol
+ * ::CapabilityFlags
+ */
+public interface Capability {
+
+    // Use the improved version of Old Password Authentication.
+    // Assumed to be set since 4.1.1.
+    int CLIENT_LONG_PASSWORD                  = 0x00000001;
+
+    // Send found rows instead of affected rows in EOF_Packet.
+    int CLIENT_FOUND_ROWS                     = 0x00000002;
+
+    // https://dev.mysql.com/doc/internals/en/com-query-response.html#packet-Protocol::ColumnDefinition320
+    // Longer flags in Protocol::ColumnDefinition320.
+    // Server:Supports longer flags.
+    // Client:Expects longer flags.
+    // 执行查询sql时,除了返回结果集,还返回元数据
+    int CLIENT_LONG_FLAG                      = 0x00000004;
+
+    // 可以在handshake时,指定一个数据库名
+    // Database (schema) name can be specified on connect in Handshake Response
+    // Packet.
+    // Server: Supports schema-name in Handshake Response Packet.
+    // Client: Handshake Response Packet contains a schema-name.
+    int CLIENT_CONNECT_WITH_DB                = 0x00000008;
+
+    // Server: Do not permit database.table.column.
+    int CLIENT_NO_SCHEMA                      = 0x00000010;
+
+    // Compression protocol supported.
+    // Server:Supports compression.
+    // Client:Switches to Compression compressed protocol after successful
+    // authentication.
+    int CLIENT_COMPRESS                       = 0x00000020;
+
+    // Special handling of ODBC behavior.
+    // No special behavior since 3.22.
+    int CLIENT_ODBC                           = 0x00000040;
+
+    // Can use LOAD DATA LOCAL.
+    // Server:Enables the LOCAL INFILE request of LOAD DATA|XML.
+    // Client:Will handle LOCAL INFILE request.
+    int CLIENT_LOCAL_FILES                    = 0x00000080;
+
+    // Server: Parser can ignore spaces before '('.
+    // Client: Let the parser ignore spaces before '('.
+    int CLIENT_IGNORE_SPACE                   = 0x00000100;
+
+    // Server:Supports the 4.1 protocol,
+    // 4.1协议中,
+    // OKPacket将会包含warning count
+    // ERR_Packet包含SQL state
+    // EOF_Packet包含warning count和status flags
+    // Client:Uses the 4.1 protocol.
+    // Note: this value was CLIENT_CHANGE_USER in 3.22, unused in 4.0
+    // If CLIENT_PROTOCOL_41 is set:
+    // 1、the ok packet contains a warning count.
+    // https://dev.mysql.com/doc/internals/en/packet-OK_Packet.html
+    // 2、ERR_Packet It contains a SQL state value if CLIENT_PROTOCOL_41 is
+    // enabled. //https://dev.mysql.com/doc/internals/en/packet-ERR_Packet.html
+    // 3、EOF_Packet If CLIENT_PROTOCOL_41 is enabled, the EOF packet contains a
+    // warning count and status flags.
+    // https://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
+    int CLIENT_PROTOCOL_41                    = 0x00000200;
+
+    // wait_timeout versus wait_interactive_timeout.
+    // Server:Supports interactive and noninteractive clients.
+    // Client:Client is interactive.
+    int CLIENT_INTERACTIVE                    = 0x00000400;
+
+    // Server: Supports SSL.
+    // Client: Switch to SSL after sending the capability-flags.
+    int CLIENT_SSL                            = 0x00000800;
+
+    // Client: Do not issue SIGPIPE if network failures occur (libmysqlclient
+    // only).
+    int CLIENT_IGNORE_SIGPIPE                 = 0x00001000;
+
+    // Server: Can send status flags in EOF_Packet.
+    // Client:Expects status flags in EOF_Packet.
+    // Note:This flag is optional in 3.23, but always set by the server since
+    // 4.0.
+    int CLIENT_TRANSACTIONS                   = 0x00002000;
+
+    // Unused
+    // Note: Was named CLIENT_PROTOCOL_41 in 4.1.0.
+    int CLIENT_RESERVED                       = 0x00004000;
+
+    /**
+     * <pre>
+     *      服务端返回20 byte随机字节,客户端利用其对密码进行加密,加密算法如下:
+     *      https://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
+     *      Authentication::Native41:
+     *      client-side expects a 20-byte random challenge
+     *      client-side returns a 20-byte response based on the algorithm described later
+     *      Name
+     *      mysql_native_password
+     *      Requires
+     *      CLIENT_SECURE_CONNECTION
+     *      Image description follows.
+     *      Image description
+     *      This method fixes a 2 short-comings of the Old Password Authentication:
+     *      (https://dev.mysql.com/doc/internals/en/old-password-authentication.html#packet-Authentication::Old)
+     *      using a tested, crypto-graphic hashing function which isn't broken
+     *      knowning the content of the hash in the mysql.user table isn't enough to authenticate against the MySQL Server.
+     *      The password is calculated by:
+     *      SHA1( password ) XOR SHA1( "20-bytes random data from server" <concat> SHA1( SHA1( password ) ) )
+     * </pre>
+     */
+    int CLIENT_SECURE_CONNECTION              = 0x00008000;
+
+    // Server:Can handle multiple statements per COM_QUERY and COM_STMT_PREPARE.
+    // Client:May send multiple statements per COM_QUERY and COM_STMT_PREPARE.
+    // Note:Was named CLIENT_MULTI_QUERIES in 4.1.0, renamed later.
+    // Requires:CLIENT_PROTOCOL_41
+    int CLIENT_MULTI_STATEMENTS               = 0x00010000;
+
+    // Server: Can send multiple resultsets for COM_QUERY.
+    // Client: Can handle multiple resultsets for COM_QUERY.
+    // Requires:CLIENT_PROTOCOL_41
+    int CLIENT_MULTI_RESULTS                  = 0x00020000;
+
+    // Server: Can send multiple resultsets for ComStmtExecutePacket.
+    // Client: Can handle multiple resultsets for ComStmtExecutePacket.
+    // Requires:CLIENT_PROTOCOL_41
+    int CLIENT_PS_MULTI_RESULTS               = 0x00040000;
+
+    // Server:Sends extra data in Initial Handshake Packet and supports the
+    // pluggable authentication protocol.
+    // Client: Supports authentication plugins.
+    // Requires: CLIENT_PROTOCOL_41
+    int CLIENT_PLUGIN_AUTH                    = 0x00080000;
+
+    // Server: Permits connection attributes in Protocol::HandshakeResponse41.
+    // Client: Sends connection attributes in Protocol::HandshakeResponse41.
+    int CLIENT_CONNECT_ATTRS                  = 0x00100000;
+
+    // Server:Understands length-encoded integer for auth response data in
+    // Protocol::HandshakeResponse41.
+    // Client:Length of auth response data in Protocol::HandshakeResponse41 is a
+    // length-encoded integer.
+    // Note: The flag was introduced in 5.6.6, but had the wrong value.
+    int CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA = 0x00200000;
+
+    // Server: Announces support for expired password extension.
+    // Client: Can handle expired passwords.
+    int CLIENT_CAN_HANDLE_EXPIRED_PASSWORDS   = 0x00400000;
+
+    // Server: Can set SERVER_SESSION_STATE_CHANGED in the Status Flags and send
+    // session-state change data after a OK packet.
+    // Client: Expects the server to send sesson-state changes after a OK
+    // packet.
+    int CLIENT_SESSION_TRACK                  = 0x00800000;
+
+    /**
+     * Server: Can send OK after a Text Resultset. Client: Expects an OK
+     * (instead of EOF) after the resultset rows of a Text Resultset.
+     * Background:To support CLIENT_SESSION_TRACK, additional information must
+     * be sent after all successful commands. Although the OK packet is
+     * extensible, the EOF packet is not due to the overlap of its bytes with
+     * the content of the Text Resultset Row. Therefore, the EOF packet in the
+     * Text Resultset is replaced with an OK packet. EOF packets are deprecated
+     * as of MySQL 5.7.5.
+     */
+    int CLIENT_DEPRECATE_EOF                  = 0x01000000;
+
+}

+ 21 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/AuthSwitchResponsePacket.java

@@ -0,0 +1,21 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets.client;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.CommandPacket;
+
+public class AuthSwitchResponsePacket extends CommandPacket {
+
+    public byte[] authData;
+
+    public void fromBytes(byte[] data) {
+    }
+
+    public byte[] toBytes() throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        out.write(authData);
+        return out.toByteArray();
+    }
+
+}

+ 26 - 7
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/ClientAuthenticationPacket.java

@@ -6,6 +6,7 @@ import java.security.NoSuchAlgorithmException;
 
 import org.apache.commons.lang.StringUtils;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.Capability;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.PacketWithHeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.MSC;
@@ -13,12 +14,17 @@ import com.alibaba.otter.canal.parse.driver.mysql.utils.MySQLPasswordEncrypter;
 
 public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
 
+    private int    clientCapability = Capability.CLIENT_LONG_PASSWORD | Capability.CLIENT_LONG_FLAG
+                                      | Capability.CLIENT_PROTOCOL_41 | Capability.CLIENT_INTERACTIVE
+                                      | Capability.CLIENT_TRANSACTIONS | Capability.CLIENT_SECURE_CONNECTION
+                                      | Capability.CLIENT_MULTI_STATEMENTS;
     private String username;
     private String password;
     private byte   charsetNumber;
     private String databaseName;
     private int    serverCapabilities;
     private byte[] scrumbleBuff;
+    private byte[] authPluginName;
 
     public void fromBytes(byte[] data) {
         // bypass since nowhere to use.
@@ -36,6 +42,7 @@ public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
      *  n (Null-Terminated String)   user
      *  n (Length Coded Binary)      scramble_buff (1 + x bytes)
      *  n (Null-Terminated String)   databasename (optional)
+     *  n (Null-Terminated String)   auth plugin name (optional)
      * </pre>
      * 
      * @throws IOException
@@ -43,13 +50,7 @@ public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
     public byte[] toBytes() throws IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         // 1. write client_flags
-        // 1|4|512|1024|8192|32768
-        /**
-         * CLIENT_LONG_PASSWORD CLIENT_LONG_FLAG CLIENT_PROTOCOL_41
-         * CLIENT_INTERACTIVE CLIENT_TRANSACTIONS CLIENT_SECURE_CONNECTION
-         * CLIENT_MULTI_STATEMENTS;
-         */
-        ByteHelper.writeUnsignedIntLittleEndian(1 | 4 | 512 | 8192 | 32768 | 0x00010000, out); // remove
+        ByteHelper.writeUnsignedIntLittleEndian(clientCapability, out); // remove
         // client_interactive
         // feature
 
@@ -76,6 +77,10 @@ public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
         if (getDatabaseName() != null) {
             ByteHelper.writeNullTerminatedString(getDatabaseName(), out);
         }
+        // 8 . (Null-Terminated String) auth plugin name (optional)
+        if (getAuthPluginName() != null) {
+            ByteHelper.writeNullTerminated(getAuthPluginName(), out);
+        }
         // end write
         return out.toByteArray();
     }
@@ -106,6 +111,9 @@ public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
 
     public void setDatabaseName(String databaseName) {
         this.databaseName = databaseName;
+        if (databaseName != null) {
+            this.clientCapability |= Capability.CLIENT_CONNECT_WITH_DB;
+        }
     }
 
     public String getDatabaseName() {
@@ -128,4 +136,15 @@ public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
         return scrumbleBuff;
     }
 
+    public byte[] getAuthPluginName() {
+        return authPluginName;
+    }
+
+    public void setAuthPluginName(byte[] authPluginName) {
+        this.authPluginName = authPluginName;
+        if (authPluginName != null) {
+            this.clientCapability |= Capability.CLIENT_PLUGIN_AUTH;
+        }
+    }
+
 }

+ 25 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/server/AuthSwitchRequestMoreData.java

@@ -0,0 +1,25 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets.server;
+
+import java.io.IOException;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.CommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
+
+public class AuthSwitchRequestMoreData extends CommandPacket {
+
+    public int    status;
+    public byte[] authData;
+
+    public void fromBytes(byte[] data) {
+        int index = 0;
+        // 1. read status
+        status = data[index];
+        index += 1;
+        authData = ByteHelper.readNullTerminatedBytes(data, index);
+    }
+
+    public byte[] toBytes() throws IOException {
+        return null;
+    }
+
+}

+ 29 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/server/AuthSwitchRequestPacket.java

@@ -0,0 +1,29 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets.server;
+
+import java.io.IOException;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.CommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
+
+public class AuthSwitchRequestPacket extends CommandPacket {
+
+    public int    status;
+    public String authName;
+    public byte[] authData;
+
+    public void fromBytes(byte[] data) {
+        int index = 0;
+        // 1. read status
+        status = data[index];
+        index += 1;
+        byte[] authName = ByteHelper.readNullTerminatedBytes(data, index);
+        this.authName = new String(authName);
+        index += authName.length + 1;
+        authData = ByteHelper.readNullTerminatedBytes(data, index);
+    }
+
+    public byte[] toBytes() throws IOException {
+        return null;
+    }
+
+}

+ 36 - 14
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/server/HandshakeInitializationPacket.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.parse.driver.mysql.packets.server;
 
 import java.io.IOException;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.Capability;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.PacketWithHeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
@@ -23,6 +24,7 @@ public class HandshakeInitializationPacket extends PacketWithHeaderPacket {
     public byte   serverCharsetNumber;
     public int    serverStatus;
     public byte[] restOfScrambleBuff;
+    public byte[] authPluginName;
 
     public HandshakeInitializationPacket(){
     }
@@ -66,20 +68,40 @@ public class HandshakeInitializationPacket extends PacketWithHeaderPacket {
         // 5. read server_capabilities
         this.serverCapabilities = ByteHelper.readUnsignedShortLittleEndian(data, index);
         index += 2;
-        // 6. read server_language
-        this.serverCharsetNumber = data[index];
-        index++;
-        // 7. read server_status
-        this.serverStatus = ByteHelper.readUnsignedShortLittleEndian(data, index);
-        index += 2;
-        // 8. bypass filtered bytes
-        index += 13;
-        // 9. read rest of scramble_buff
-        this.restOfScrambleBuff = ByteHelper.readFixedLengthBytes(data, index, 12); // 虽然Handshake
-                                                                                    // Initialization
-        // Packet规定最后13个byte是剩下的scrumble,
-        // 但实际上最后一个字节是0, 不应该包含在scrumble中.
-        // end read
+        if (data.length > index) {
+            // 6. read server_language
+            this.serverCharsetNumber = data[index];
+            index++;
+            // 7. read server_status
+            this.serverStatus = ByteHelper.readUnsignedShortLittleEndian(data, index);
+            index += 2;
+            // 8. bypass filtered bytes
+            int capabilityFlags2 = ByteHelper.readUnsignedShortLittleEndian(data, index);
+            index += 2;
+            int capabilities = (capabilityFlags2 << 16) | this.serverCapabilities;
+            // int authPluginDataLen = -1;
+            // if ((capabilities & Capability.CLIENT_PLUGIN_AUTH) != 0) {
+            // authPluginDataLen = data[index];
+            // }
+            index += 1;
+            index += 10;
+            // 9. read rest of scramble_buff
+            if ((capabilities & Capability.CLIENT_SECURE_CONNECTION) != 0) {
+                // int len = Math.max(13, authPluginDataLen - 8);
+                // this.authPluginDataPart2 =
+                // buffer.readFixedLengthString(len);// scramble2
+
+                // Packet规定最后13个byte是剩下的scrumble,
+                // 但实际上最后一个字节是0, 不应该包含在scrumble中.
+                this.restOfScrambleBuff = ByteHelper.readFixedLengthBytes(data, index, 12);
+            }
+
+            index += 12 + 1;
+            if ((capabilities & Capability.CLIENT_PLUGIN_AUTH) != 0) {
+                this.authPluginName = ByteHelper.readNullTerminatedBytes(data, index);
+            }
+            // end read
+        }
     }
 
     /**

+ 4 - 29
driver/src/test/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnectorTest.java

@@ -3,51 +3,26 @@ package com.alibaba.otter.canal.parse.driver.mysql;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import org.junit.Assert;
-import org.junit.Test;
-
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 
 public class MysqlConnectorTest {
 
-    @Test
-    public void testQuery() {
-
-        MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "xxxxx", "xxxxx");
+    public static void main(String args[]) {
+        MysqlConnector connector = new MysqlConnector(new InetSocketAddress("yddb01.mysql.database.chinacloudapi.cn",
+            3306), "ps-admin01@yddb01", "1qaz3edc");
         try {
             connector.connect();
             MysqlQueryExecutor executor = new MysqlQueryExecutor(connector);
             ResultSetPacket result = executor.query("show variables like '%char%';");
             System.out.println(result);
-            result = executor.query("select * from test.test1");
-            System.out.println(result);
         } catch (IOException e) {
-            Assert.fail(e.getMessage());
+            e.printStackTrace();
         } finally {
             try {
                 connector.disconnect();
             } catch (IOException e) {
-                Assert.fail(e.getMessage());
             }
         }
     }
 
-    // @Test
-    public void testUpdate() {
-
-        MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "xxxxx", "xxxxx");
-        try {
-            connector.connect();
-            MysqlUpdateExecutor executor = new MysqlUpdateExecutor(connector);
-            executor.update("insert into test.test2(id,name,score,text_value) values(null,'中文1',10,'中文2')");
-        } catch (IOException e) {
-            Assert.fail(e.getMessage());
-        } finally {
-            try {
-                connector.disconnect();
-            } catch (IOException e) {
-                Assert.fail(e.getMessage());
-            }
-        }
-    }
 }

+ 10 - 5
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -36,8 +36,11 @@ public class MQMessageUtils {
 
                                                                                  public List<PartitionData> apply(String pkHashConfigs) {
                                                                                      List<PartitionData> datas = Lists.newArrayList();
-                                                                                     String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
-                                                                                         ",");
+
+                                                                                     String[] pkHashConfigArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
+                                                                                         ",",
+                                                                                         ";"),
+                                                                                         ";");
                                                                                      // schema.table:id^name
                                                                                      for (String pkHashConfig : pkHashConfigArray) {
                                                                                          PartitionData data = new PartitionData();
@@ -75,8 +78,10 @@ public class MQMessageUtils {
 
                                                                                  public List<DynamicTopicData> apply(String pkHashConfigs) {
                                                                                      List<DynamicTopicData> datas = Lists.newArrayList();
-                                                                                     String[] dynamicTopicArray = StringUtils.split(pkHashConfigs,
-                                                                                         ",");
+                                                                                     String[] dynamicTopicArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
+                                                                                         ",",
+                                                                                         ";"),
+                                                                                         ";");
                                                                                      // schema.table
                                                                                      for (String dynamicTopic : dynamicTopicArray) {
                                                                                          DynamicTopicData data = new DynamicTopicData();
@@ -509,7 +514,7 @@ public class MQMessageUtils {
     }
 
     private static Set<String> matchTopics(String name, String dynamicTopicConfigs) {
-        String[] router = StringUtils.split(dynamicTopicConfigs, ',');
+        String[] router = StringUtils.split(StringUtils.replace(dynamicTopicConfigs, ",", ";"), ";");
         Set<String> topics = new HashSet<>();
         for (String item : router) {
             int i = item.indexOf(":");