1
0
Эх сурвалжийг харах

Merge branch 'master' into master

rewerma 6 жил өмнө
parent
commit
d13c1eb17c
38 өөрчлөгдсөн 519 нэмэгдсэн , 94 устгасан
  1. 1 1
      client-adapter/elasticsearch/pom.xml
  2. 7 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java
  3. 10 2
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SqlParser.java
  4. 5 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java
  5. 1 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  6. 7 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java
  7. 5 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSub2Test.java
  8. 5 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSubTest.java
  9. 5 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOne2Test.java
  10. 5 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java
  11. 5 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java
  12. 5 1
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java
  13. 5 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  14. 12 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java
  15. 5 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java
  16. 8 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java
  17. 6 1
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java
  18. 0 1
      deployer/src/main/resources/example/instance.properties
  19. 2 2
      deployer/src/main/resources/spring/default-instance.xml
  20. 2 2
      deployer/src/main/resources/spring/file-instance.xml
  21. 4 4
      deployer/src/main/resources/spring/group-instance.xml
  22. 2 2
      deployer/src/main/resources/spring/memory-instance.xml
  23. 54 7
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java
  24. 170 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/Capability.java
  25. 21 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/AuthSwitchResponsePacket.java
  26. 26 7
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/ClientAuthenticationPacket.java
  27. 25 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/server/AuthSwitchRequestMoreData.java
  28. 29 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/server/AuthSwitchRequestPacket.java
  29. 36 14
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/server/HandshakeInitializationPacket.java
  30. 6 1
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java
  31. 2 2
      instance/spring/src/test/resources/spring/default-instance.xml
  32. 2 2
      instance/spring/src/test/resources/spring/file-instance.xml
  33. 4 4
      instance/spring/src/test/resources/spring/group-instance.xml
  34. 2 2
      instance/spring/src/test/resources/spring/memory-instance.xml
  35. 1 1
      pom.xml
  36. 12 7
      server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java
  37. 6 7
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java
  38. 16 10
      server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

+ 1 - 1
client-adapter/elasticsearch/pom.xml

@@ -21,7 +21,7 @@
         <dependency>
             <groupId>com.alibaba.fastsql</groupId>
             <artifactId>fastsql</artifactId>
-            <version>2.0.0_preview_644</version>
+            <version>2.0.0_preview_855</version>
         </dependency>
         <dependency>
             <groupId>org.elasticsearch</groupId>

+ 7 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -17,6 +17,7 @@ import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.MDC;
 
 import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -29,7 +30,12 @@ import com.alibaba.otter.canal.client.adapter.es.monitor.ESConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.es.service.ESEtlService;
 import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
-import com.alibaba.otter.canal.client.adapter.support.*;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+
 
 /**
  * ES外部适配器

+ 10 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SqlParser.java

@@ -9,8 +9,16 @@ import java.util.stream.Collectors;
 
 import com.alibaba.fastsql.sql.SQLUtils;
 import com.alibaba.fastsql.sql.ast.SQLExpr;
-import com.alibaba.fastsql.sql.ast.expr.*;
-import com.alibaba.fastsql.sql.ast.statement.*;
+import com.alibaba.fastsql.sql.ast.expr.SQLBinaryOpExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLCaseExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLIdentifierExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLMethodInvokeExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLPropertyExpr;
+import com.alibaba.fastsql.sql.ast.statement.SQLExprTableSource;
+import com.alibaba.fastsql.sql.ast.statement.SQLJoinTableSource;
+import com.alibaba.fastsql.sql.ast.statement.SQLSelectStatement;
+import com.alibaba.fastsql.sql.ast.statement.SQLSubqueryTableSource;
+import com.alibaba.fastsql.sql.ast.statement.SQLTableSource;
 import com.alibaba.fastsql.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
 import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlStatementParser;
 import com.alibaba.fastsql.sql.parser.ParserException;

+ 5 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -1,7 +1,11 @@
 package com.alibaba.otter.canal.client.adapter.es.service;
 
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;

+ 1 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -7,7 +7,6 @@ import java.util.Map;
 
 import javax.sql.DataSource;
 
-import com.alibaba.otter.canal.client.adapter.support.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,6 +22,7 @@ import com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
  * ES 同步 Service

+ 7 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java

@@ -5,7 +5,13 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.sql.Blob;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.codec.binary.Base64;
 import org.joda.time.DateTime;

+ 5 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSub2Test.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 

+ 5 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSubTest.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 

+ 5 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOne2Test.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 

+ 5 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 

+ 5 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 

+ 5 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.elasticsearch.action.get.GetResponse;
 import org.junit.Assert;

+ 5 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -2,7 +2,11 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;

+ 12 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -1,13 +1,23 @@
 package com.alibaba.otter.canal.adapter.launcher.rest;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
 
 import com.alibaba.otter.canal.adapter.launcher.common.EtlLock;
 import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;

+ 5 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.rdb.service;
 
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;

+ 8 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -4,7 +4,14 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
-import java.sql.*;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;

+ 6 - 1
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java

@@ -1,6 +1,11 @@
 package com.alibaba.otter.canal.client.adapter.rdb.test.sync;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.junit.Before;
 import org.junit.Test;

+ 0 - 1
deployer/src/main/resources/example/instance.properties

@@ -33,7 +33,6 @@ canal.instance.tsdb.enable=true
 canal.instance.dbUsername=canal
 canal.instance.dbPassword=canal
 canal.instance.connectionCharset = UTF-8
-canal.instance.defaultDatabaseName =test
 # enable druid Decrypt database password
 canal.instance.enableDruid=false
 #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

+ 2 - 2
deployer/src/main/resources/spring/default-instance.xml

@@ -136,7 +136,7 @@
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -146,7 +146,7 @@
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 

+ 2 - 2
deployer/src/main/resources/spring/file-instance.xml

@@ -121,7 +121,7 @@
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -131,7 +131,7 @@
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 

+ 4 - 4
deployer/src/main/resources/spring/group-instance.xml

@@ -118,7 +118,7 @@
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -128,7 +128,7 @@
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 
@@ -220,7 +220,7 @@
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -230,7 +230,7 @@
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 

+ 2 - 2
deployer/src/main/resources/spring/memory-instance.xml

@@ -109,7 +109,7 @@
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -119,7 +119,7 @@
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
 				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
 				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 

+ 54 - 7
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -2,19 +2,24 @@ package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.AuthSwitchResponsePacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.ClientAuthenticationPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.QuitCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.AuthSwitchRequestMoreData;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.AuthSwitchRequestPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.HandshakeInitializationPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.Reply323Packet;
 import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannelPool;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.MSC;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.MySQLPasswordEncrypter;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 
@@ -32,7 +37,7 @@ public class MysqlConnector {
     private String              password;
 
     private byte                charsetNumber     = 33;
-    private String              defaultSchema     = "test";
+    private String              defaultSchema;
     private int                 soTimeout         = 30 * 1000;
     private int                 connTimeout       = 5 * 1000;
     private int                 receiveBufferSize = 16 * 1024;
@@ -150,6 +155,7 @@ public class MysqlConnector {
     }
 
     private void negotiate(SocketChannel channel) throws IOException {
+        // https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol
         HeaderPacket header = PacketManager.readHeader(channel, 4, timeout);
         byte[] body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
         if (body[0] < 0) {// check field_count
@@ -165,10 +171,14 @@ public class MysqlConnector {
         }
         HandshakeInitializationPacket handshakePacket = new HandshakeInitializationPacket();
         handshakePacket.fromBytes(body);
-        connectionId = handshakePacket.threadId; // 记录一下connection
+        if (handshakePacket.protocolVersion != MSC.DEFAULT_PROTOCOL_VERSION) {
+            // HandshakeV9
+            auth323(channel, (byte) (header.getPacketSequenceNumber() + 1), handshakePacket.seed);
+            return;
+        }
 
+        connectionId = handshakePacket.threadId; // 记录一下connection
         logger.info("handshake initialization packet received, prepare the client authentication packet to send");
-
         ClientAuthenticationPacket clientAuth = new ClientAuthenticationPacket();
         clientAuth.setCharsetNumber(charsetNumber);
 
@@ -177,6 +187,7 @@ public class MysqlConnector {
         clientAuth.setServerCapabilities(handshakePacket.serverCapabilities);
         clientAuth.setDatabaseName(defaultSchema);
         clientAuth.setScrumbleBuff(joinAndCreateScrumbleBuff(handshakePacket));
+        clientAuth.setAuthPluginName("mysql_native_password".getBytes());
 
         byte[] clientAuthPkgBody = clientAuth.toBytes();
         HeaderPacket h = new HeaderPacket();
@@ -192,15 +203,51 @@ public class MysqlConnector {
         body = null;
         body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
         assert body != null;
+        byte marker = body[0];
+        if (marker == -2 || marker == 1) {
+            byte[] authData = null;
+            String pluginName = null;
+            if (marker == 1) {
+                AuthSwitchRequestMoreData packet = new AuthSwitchRequestMoreData();
+                packet.fromBytes(body);
+                authData = packet.authData;
+            } else {
+                AuthSwitchRequestPacket packet = new AuthSwitchRequestPacket();
+                packet.fromBytes(body);
+                authData = packet.authData;
+                pluginName = packet.authName;
+            }
+
+            if (pluginName != null && "mysql_native_password".equals(pluginName)) {
+                byte[] encryptedPassword = null;
+                try {
+                    encryptedPassword = MySQLPasswordEncrypter.scramble411(getPassword().getBytes(), authData);
+                } catch (NoSuchAlgorithmException e) {
+                    throw new RuntimeException("can't encrypt password that will be sent to MySQL server.", e);
+                }
+                AuthSwitchResponsePacket responsePacket = new AuthSwitchResponsePacket();
+                responsePacket.authData = encryptedPassword;
+                byte[] auth = responsePacket.toBytes();
+
+                h = new HeaderPacket();
+                h.setPacketBodyLength(auth.length);
+                h.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
+                PacketManager.writePkg(channel, h.toBytes(), auth);
+                logger.info("auth switch response packet is sent out.");
+
+                header = null;
+                header = PacketManager.readHeader(channel, 4);
+                body = null;
+                body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
+                assert body != null;
+            }
+        }
+
         if (body[0] < 0) {
             if (body[0] == -1) {
                 ErrorPacket err = new ErrorPacket();
                 err.fromBytes(body);
                 throw new IOException("Error When doing Client Authentication:" + err.toString());
-            } else if (body[0] == -2) {
-                auth323(channel, header.getPacketSequenceNumber(), handshakePacket.seed);
-                // throw new
-                // IOException("Unexpected EOF packet at Client Authentication.");
             } else {
                 throw new IOException("unpexpected packet with field_count=" + body[0]);
             }

+ 170 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/Capability.java

@@ -0,0 +1,170 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets;
+
+/**
+ * https://dev.mysql.com/doc/internals/en/capability-flags.html#packet-Protocol
+ * ::CapabilityFlags
+ */
+public interface Capability {
+
+    // Use the improved version of Old Password Authentication.
+    // Assumed to be set since 4.1.1.
+    int CLIENT_LONG_PASSWORD                  = 0x00000001;
+
+    // Send found rows instead of affected rows in EOF_Packet.
+    int CLIENT_FOUND_ROWS                     = 0x00000002;
+
+    // https://dev.mysql.com/doc/internals/en/com-query-response.html#packet-Protocol::ColumnDefinition320
+    // Longer flags in Protocol::ColumnDefinition320.
+    // Server:Supports longer flags.
+    // Client:Expects longer flags.
+    // 执行查询sql时,除了返回结果集,还返回元数据
+    int CLIENT_LONG_FLAG                      = 0x00000004;
+
+    // 可以在handshake时,指定一个数据库名
+    // Database (schema) name can be specified on connect in Handshake Response
+    // Packet.
+    // Server: Supports schema-name in Handshake Response Packet.
+    // Client: Handshake Response Packet contains a schema-name.
+    int CLIENT_CONNECT_WITH_DB                = 0x00000008;
+
+    // Server: Do not permit database.table.column.
+    int CLIENT_NO_SCHEMA                      = 0x00000010;
+
+    // Compression protocol supported.
+    // Server:Supports compression.
+    // Client:Switches to Compression compressed protocol after successful
+    // authentication.
+    int CLIENT_COMPRESS                       = 0x00000020;
+
+    // Special handling of ODBC behavior.
+    // No special behavior since 3.22.
+    int CLIENT_ODBC                           = 0x00000040;
+
+    // Can use LOAD DATA LOCAL.
+    // Server:Enables the LOCAL INFILE request of LOAD DATA|XML.
+    // Client:Will handle LOCAL INFILE request.
+    int CLIENT_LOCAL_FILES                    = 0x00000080;
+
+    // Server: Parser can ignore spaces before '('.
+    // Client: Let the parser ignore spaces before '('.
+    int CLIENT_IGNORE_SPACE                   = 0x00000100;
+
+    // Server:Supports the 4.1 protocol,
+    // 4.1协议中,
+    // OKPacket将会包含warning count
+    // ERR_Packet包含SQL state
+    // EOF_Packet包含warning count和status flags
+    // Client:Uses the 4.1 protocol.
+    // Note: this value was CLIENT_CHANGE_USER in 3.22, unused in 4.0
+    // If CLIENT_PROTOCOL_41 is set:
+    // 1、the ok packet contains a warning count.
+    // https://dev.mysql.com/doc/internals/en/packet-OK_Packet.html
+    // 2、ERR_Packet It contains a SQL state value if CLIENT_PROTOCOL_41 is
+    // enabled. //https://dev.mysql.com/doc/internals/en/packet-ERR_Packet.html
+    // 3、EOF_Packet If CLIENT_PROTOCOL_41 is enabled, the EOF packet contains a
+    // warning count and status flags.
+    // https://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
+    int CLIENT_PROTOCOL_41                    = 0x00000200;
+
+    // wait_timeout versus wait_interactive_timeout.
+    // Server:Supports interactive and noninteractive clients.
+    // Client:Client is interactive.
+    int CLIENT_INTERACTIVE                    = 0x00000400;
+
+    // Server: Supports SSL.
+    // Client: Switch to SSL after sending the capability-flags.
+    int CLIENT_SSL                            = 0x00000800;
+
+    // Client: Do not issue SIGPIPE if network failures occur (libmysqlclient
+    // only).
+    int CLIENT_IGNORE_SIGPIPE                 = 0x00001000;
+
+    // Server: Can send status flags in EOF_Packet.
+    // Client:Expects status flags in EOF_Packet.
+    // Note:This flag is optional in 3.23, but always set by the server since
+    // 4.0.
+    int CLIENT_TRANSACTIONS                   = 0x00002000;
+
+    // Unused
+    // Note: Was named CLIENT_PROTOCOL_41 in 4.1.0.
+    int CLIENT_RESERVED                       = 0x00004000;
+
+    /**
+     * <pre>
+     *      服务端返回20 byte随机字节,客户端利用其对密码进行加密,加密算法如下:
+     *      https://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
+     *      Authentication::Native41:
+     *      client-side expects a 20-byte random challenge
+     *      client-side returns a 20-byte response based on the algorithm described later
+     *      Name
+     *      mysql_native_password
+     *      Requires
+     *      CLIENT_SECURE_CONNECTION
+     *      Image description follows.
+     *      Image description
+     *      This method fixes a 2 short-comings of the Old Password Authentication:
+     *      (https://dev.mysql.com/doc/internals/en/old-password-authentication.html#packet-Authentication::Old)
+     *      using a tested, crypto-graphic hashing function which isn't broken
+     *      knowning the content of the hash in the mysql.user table isn't enough to authenticate against the MySQL Server.
+     *      The password is calculated by:
+     *      SHA1( password ) XOR SHA1( "20-bytes random data from server" <concat> SHA1( SHA1( password ) ) )
+     * </pre>
+     */
+    int CLIENT_SECURE_CONNECTION              = 0x00008000;
+
+    // Server:Can handle multiple statements per COM_QUERY and COM_STMT_PREPARE.
+    // Client:May send multiple statements per COM_QUERY and COM_STMT_PREPARE.
+    // Note:Was named CLIENT_MULTI_QUERIES in 4.1.0, renamed later.
+    // Requires:CLIENT_PROTOCOL_41
+    int CLIENT_MULTI_STATEMENTS               = 0x00010000;
+
+    // Server: Can send multiple resultsets for COM_QUERY.
+    // Client: Can handle multiple resultsets for COM_QUERY.
+    // Requires:CLIENT_PROTOCOL_41
+    int CLIENT_MULTI_RESULTS                  = 0x00020000;
+
+    // Server: Can send multiple resultsets for ComStmtExecutePacket.
+    // Client: Can handle multiple resultsets for ComStmtExecutePacket.
+    // Requires:CLIENT_PROTOCOL_41
+    int CLIENT_PS_MULTI_RESULTS               = 0x00040000;
+
+    // Server:Sends extra data in Initial Handshake Packet and supports the
+    // pluggable authentication protocol.
+    // Client: Supports authentication plugins.
+    // Requires: CLIENT_PROTOCOL_41
+    int CLIENT_PLUGIN_AUTH                    = 0x00080000;
+
+    // Server: Permits connection attributes in Protocol::HandshakeResponse41.
+    // Client: Sends connection attributes in Protocol::HandshakeResponse41.
+    int CLIENT_CONNECT_ATTRS                  = 0x00100000;
+
+    // Server:Understands length-encoded integer for auth response data in
+    // Protocol::HandshakeResponse41.
+    // Client:Length of auth response data in Protocol::HandshakeResponse41 is a
+    // length-encoded integer.
+    // Note: The flag was introduced in 5.6.6, but had the wrong value.
+    int CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA = 0x00200000;
+
+    // Server: Announces support for expired password extension.
+    // Client: Can handle expired passwords.
+    int CLIENT_CAN_HANDLE_EXPIRED_PASSWORDS   = 0x00400000;
+
+    // Server: Can set SERVER_SESSION_STATE_CHANGED in the Status Flags and send
+    // session-state change data after a OK packet.
+    // Client: Expects the server to send sesson-state changes after a OK
+    // packet.
+    int CLIENT_SESSION_TRACK                  = 0x00800000;
+
+    /**
+     * Server: Can send OK after a Text Resultset. Client: Expects an OK
+     * (instead of EOF) after the resultset rows of a Text Resultset.
+     * Background:To support CLIENT_SESSION_TRACK, additional information must
+     * be sent after all successful commands. Although the OK packet is
+     * extensible, the EOF packet is not due to the overlap of its bytes with
+     * the content of the Text Resultset Row. Therefore, the EOF packet in the
+     * Text Resultset is replaced with an OK packet. EOF packets are deprecated
+     * as of MySQL 5.7.5.
+     */
+    int CLIENT_DEPRECATE_EOF                  = 0x01000000;
+
+}

+ 21 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/AuthSwitchResponsePacket.java

@@ -0,0 +1,21 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets.client;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.CommandPacket;
+
+public class AuthSwitchResponsePacket extends CommandPacket {
+
+    public byte[] authData;
+
+    public void fromBytes(byte[] data) {
+    }
+
+    public byte[] toBytes() throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        out.write(authData);
+        return out.toByteArray();
+    }
+
+}

+ 26 - 7
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/ClientAuthenticationPacket.java

@@ -6,6 +6,7 @@ import java.security.NoSuchAlgorithmException;
 
 import org.apache.commons.lang.StringUtils;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.Capability;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.PacketWithHeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.MSC;
@@ -13,12 +14,17 @@ import com.alibaba.otter.canal.parse.driver.mysql.utils.MySQLPasswordEncrypter;
 
 public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
 
+    private int    clientCapability = Capability.CLIENT_LONG_PASSWORD | Capability.CLIENT_LONG_FLAG
+                                      | Capability.CLIENT_PROTOCOL_41 | Capability.CLIENT_INTERACTIVE
+                                      | Capability.CLIENT_TRANSACTIONS | Capability.CLIENT_SECURE_CONNECTION
+                                      | Capability.CLIENT_MULTI_STATEMENTS | Capability.CLIENT_PLUGIN_AUTH;
     private String username;
     private String password;
     private byte   charsetNumber;
     private String databaseName;
     private int    serverCapabilities;
     private byte[] scrumbleBuff;
+    private byte[] authPluginName;
 
     public void fromBytes(byte[] data) {
         // bypass since nowhere to use.
@@ -36,6 +42,7 @@ public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
      *  n (Null-Terminated String)   user
      *  n (Length Coded Binary)      scramble_buff (1 + x bytes)
      *  n (Null-Terminated String)   databasename (optional)
+     *  n (Null-Terminated String)   auth plugin name (optional)
      * </pre>
      * 
      * @throws IOException
@@ -43,13 +50,7 @@ public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
     public byte[] toBytes() throws IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         // 1. write client_flags
-        // 1|4|512|1024|8192|32768
-        /**
-         * CLIENT_LONG_PASSWORD CLIENT_LONG_FLAG CLIENT_PROTOCOL_41
-         * CLIENT_INTERACTIVE CLIENT_TRANSACTIONS CLIENT_SECURE_CONNECTION
-         * CLIENT_MULTI_STATEMENTS;
-         */
-        ByteHelper.writeUnsignedIntLittleEndian(1 | 4 | 512 | 8192 | 32768 | 0x00010000, out); // remove
+        ByteHelper.writeUnsignedIntLittleEndian(clientCapability, out); // remove
         // client_interactive
         // feature
 
@@ -76,6 +77,10 @@ public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
         if (getDatabaseName() != null) {
             ByteHelper.writeNullTerminatedString(getDatabaseName(), out);
         }
+        // 8 . (Null-Terminated String) auth plugin name (optional)
+        if (getAuthPluginName() != null) {
+            ByteHelper.writeNullTerminated(getAuthPluginName(), out);
+        }
         // end write
         return out.toByteArray();
     }
@@ -106,6 +111,9 @@ public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
 
     public void setDatabaseName(String databaseName) {
         this.databaseName = databaseName;
+        if (databaseName != null) {
+            this.clientCapability |= Capability.CLIENT_CONNECT_WITH_DB;
+        }
     }
 
     public String getDatabaseName() {
@@ -128,4 +136,15 @@ public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
         return scrumbleBuff;
     }
 
+    public byte[] getAuthPluginName() {
+        return authPluginName;
+    }
+
+    public void setAuthPluginName(byte[] authPluginName) {
+        this.authPluginName = authPluginName;
+        if (authPluginName != null) {
+            this.clientCapability |= Capability.CLIENT_PLUGIN_AUTH;
+        }
+    }
+
 }

+ 25 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/server/AuthSwitchRequestMoreData.java

@@ -0,0 +1,25 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets.server;
+
+import java.io.IOException;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.CommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
+
+public class AuthSwitchRequestMoreData extends CommandPacket {
+
+    public int    status;
+    public byte[] authData;
+
+    public void fromBytes(byte[] data) {
+        int index = 0;
+        // 1. read status
+        status = data[index];
+        index += 1;
+        authData = ByteHelper.readNullTerminatedBytes(data, index);
+    }
+
+    public byte[] toBytes() throws IOException {
+        return null;
+    }
+
+}

+ 29 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/server/AuthSwitchRequestPacket.java

@@ -0,0 +1,29 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets.server;
+
+import java.io.IOException;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.CommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
+
+public class AuthSwitchRequestPacket extends CommandPacket {
+
+    public int    status;
+    public String authName;
+    public byte[] authData;
+
+    public void fromBytes(byte[] data) {
+        int index = 0;
+        // 1. read status
+        status = data[index];
+        index += 1;
+        byte[] authName = ByteHelper.readNullTerminatedBytes(data, index);
+        this.authName = new String(authName);
+        index += authName.length + 1;
+        authData = ByteHelper.readNullTerminatedBytes(data, index);
+    }
+
+    public byte[] toBytes() throws IOException {
+        return null;
+    }
+
+}

+ 36 - 14
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/server/HandshakeInitializationPacket.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.parse.driver.mysql.packets.server;
 
 import java.io.IOException;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.Capability;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.PacketWithHeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
@@ -23,6 +24,7 @@ public class HandshakeInitializationPacket extends PacketWithHeaderPacket {
     public byte   serverCharsetNumber;
     public int    serverStatus;
     public byte[] restOfScrambleBuff;
+    public byte[] authPluginName;
 
     public HandshakeInitializationPacket(){
     }
@@ -66,20 +68,40 @@ public class HandshakeInitializationPacket extends PacketWithHeaderPacket {
         // 5. read server_capabilities
         this.serverCapabilities = ByteHelper.readUnsignedShortLittleEndian(data, index);
         index += 2;
-        // 6. read server_language
-        this.serverCharsetNumber = data[index];
-        index++;
-        // 7. read server_status
-        this.serverStatus = ByteHelper.readUnsignedShortLittleEndian(data, index);
-        index += 2;
-        // 8. bypass filtered bytes
-        index += 13;
-        // 9. read rest of scramble_buff
-        this.restOfScrambleBuff = ByteHelper.readFixedLengthBytes(data, index, 12); // 虽然Handshake
-                                                                                    // Initialization
-        // Packet规定最后13个byte是剩下的scrumble,
-        // 但实际上最后一个字节是0, 不应该包含在scrumble中.
-        // end read
+        if (data.length > index) {
+            // 6. read server_language
+            this.serverCharsetNumber = data[index];
+            index++;
+            // 7. read server_status
+            this.serverStatus = ByteHelper.readUnsignedShortLittleEndian(data, index);
+            index += 2;
+            // 8. bypass filtered bytes
+            int capabilityFlags2 = ByteHelper.readUnsignedShortLittleEndian(data, index);
+            index += 2;
+            int capabilities = (capabilityFlags2 << 16) | this.serverCapabilities;
+            // int authPluginDataLen = -1;
+            // if ((capabilities & Capability.CLIENT_PLUGIN_AUTH) != 0) {
+            // authPluginDataLen = data[index];
+            // }
+            index += 1;
+            index += 10;
+            // 9. read rest of scramble_buff
+            if ((capabilities & Capability.CLIENT_SECURE_CONNECTION) != 0) {
+                // int len = Math.max(13, authPluginDataLen - 8);
+                // this.authPluginDataPart2 =
+                // buffer.readFixedLengthString(len);// scramble2
+
+                // Packet规定最后13个byte是剩下的scrumble,
+                // 但实际上最后一个字节是0, 不应该包含在scrumble中.
+                this.restOfScrambleBuff = ByteHelper.readFixedLengthBytes(data, index, 12);
+            }
+
+            index += 12 + 1;
+            if ((capabilities & Capability.CLIENT_PLUGIN_AUTH) != 0) {
+                this.authPluginName = ByteHelper.readNullTerminatedBytes(data, index);
+            }
+            // end read
+        }
     }
 
     /**

+ 6 - 1
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.parse.driver.mysql.utils;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
@@ -28,7 +29,11 @@ public abstract class PacketManager {
     }
 
     public static void writePkg(SocketChannel ch, byte[]... srcs) throws IOException {
-        ch.write(srcs);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        for (byte[] src : srcs) {
+            out.write(src);
+        }
+        ch.write(out.toByteArray());
     }
 
     public static void writeBody(SocketChannel ch, byte[] body) throws IOException {

+ 2 - 2
instance/spring/src/test/resources/spring/default-instance.xml

@@ -140,7 +140,7 @@
 				<property name="address" value="${canal.instance.master.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -148,7 +148,7 @@
 				<property name="address" value="${canal.instance.standby.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		

+ 2 - 2
instance/spring/src/test/resources/spring/file-instance.xml

@@ -128,7 +128,7 @@
 				<property name="address" value="${canal.instance.master.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -136,7 +136,7 @@
 				<property name="address" value="${canal.instance.standby.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		

+ 4 - 4
instance/spring/src/test/resources/spring/group-instance.xml

@@ -123,7 +123,7 @@
 				<property name="address" value="${canal.instance.master1.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -131,7 +131,7 @@
 				<property name="address" value="${canal.instance.standby1.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		
@@ -203,7 +203,7 @@
 				<property name="address" value="${canal.instance.master2.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -211,7 +211,7 @@
 				<property name="address" value="${canal.instance.standby2.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		

+ 2 - 2
instance/spring/src/test/resources/spring/memory-instance.xml

@@ -114,7 +114,7 @@
 				<property name="address" value="${canal.instance.master.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
@@ -122,7 +122,7 @@
 				<property name="address" value="${canal.instance.standby.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
 			</bean>
 		</property>
 		

+ 1 - 1
pom.xml

@@ -246,7 +246,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_836</version>
+                <version>2.0.0_preview_855</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>

+ 12 - 7
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -36,8 +36,11 @@ public class MQMessageUtils {
 
                                                                                  public List<PartitionData> apply(String pkHashConfigs) {
                                                                                      List<PartitionData> datas = Lists.newArrayList();
-                                                                                     String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
-                                                                                         ",");
+
+                                                                                     String[] pkHashConfigArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
+                                                                                         ",",
+                                                                                         ";"),
+                                                                                         ";");
                                                                                      // schema.table:id^name
                                                                                      for (String pkHashConfig : pkHashConfigArray) {
                                                                                          PartitionData data = new PartitionData();
@@ -75,8 +78,10 @@ public class MQMessageUtils {
 
                                                                                  public List<DynamicTopicData> apply(String pkHashConfigs) {
                                                                                      List<DynamicTopicData> datas = Lists.newArrayList();
-                                                                                     String[] dynamicTopicArray = StringUtils.split(pkHashConfigs,
-                                                                                         ",");
+                                                                                     String[] dynamicTopicArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
+                                                                                         ",",
+                                                                                         ";"),
+                                                                                         ";");
                                                                                      // schema.table
                                                                                      for (String dynamicTopic : dynamicTopicArray) {
                                                                                          DynamicTopicData data = new DynamicTopicData();
@@ -216,7 +221,7 @@ public class MQMessageUtils {
                         partitionEntries[pkHash].add(entry);
                     } else {
                         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
-                            int hashCode = table.hashCode();
+                            int hashCode = database.hashCode();
                             if (hashMode.autoPkHash) {
                                 // isEmpty use default pkNames
                                 for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
@@ -432,7 +437,7 @@ public class MQMessageUtils {
 
                     int idx = 0;
                     for (Map<String, String> row : flatMessage.getData()) {
-                        int hashCode = table.hashCode();
+                        int hashCode = database.hashCode();
                         for (String pkName : pkNames) {
                             String value = row.get(pkName);
                             if (value == null) {
@@ -509,7 +514,7 @@ public class MQMessageUtils {
     }
 
     private static Set<String> matchTopics(String name, String dynamicTopicConfigs) {
-        String[] router = StringUtils.split(dynamicTopicConfigs, ',');
+        String[] router = StringUtils.split(StringUtils.replace(dynamicTopicConfigs, ",", ";"), ";");
         Set<String> topics = new HashSet<>();
         for (String item : router) {
             int i = item.indexOf(":");

+ 6 - 7
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.kafka;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -138,7 +139,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
     private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
                                                                                                         throws Exception {
         if (!kafkaProperties.getFlatMessage()) {
-            ProducerRecord<String, Message> record = null;
+            List<ProducerRecord<String, Message>> records = new ArrayList<ProducerRecord<String, Message>>();
             if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                 Message[] messages = MQMessageUtils.messagePartition(message,
                     canalDestination.getPartitionsNum(),
@@ -147,18 +148,16 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 for (int i = 0; i < length; i++) {
                     Message messagePartition = messages[i];
                     if (messagePartition != null) {
-                        record = new ProducerRecord<>(topicName, i, null, messagePartition);
+                        records.add(new ProducerRecord<String, Message>(topicName, i, null, messagePartition));
                     }
                 }
             } else {
                 final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
-                record = new ProducerRecord<>(topicName, partition, null, message);
+                records.add(new ProducerRecord<String, Message>(topicName, partition, null, message));
             }
 
-            if (record != null) {
-                if (kafkaProperties.getTransaction()) {
-                    producer.send(record).get();
-                } else {
+            if (!records.isEmpty()) {
+                for (ProducerRecord<String, Message> record : records) {
                     producer.send(record).get();
                 }
 

+ 16 - 10
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -107,11 +107,10 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                     @Override
                                     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                         if (index > mqs.size()) {
-                                            throw new CanalServerException("partition number is error,config num:"
-                                                                           + destination.getPartitionsNum()
-                                                                           + ", mq num: " + mqs.size());
+                                            return mqs.get(index % mqs.size());
+                                        } else {
+                                            return mqs.get(index);
                                         }
-                                        return mqs.get(index);
                                     }
                                 }, null);
                             } catch (Exception e) {
@@ -134,7 +133,11 @@ public class CanalRocketMQProducer implements CanalMQProducer {
 
                         @Override
                         public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                            return mqs.get(partition);
+                            if (partition > mqs.size()) {
+                                return mqs.get(partition % mqs.size());
+                            } else {
+                                return mqs.get(partition);
+                            }
                         }
                     }, null);
                 }
@@ -168,11 +171,10 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                         @Override
                                         public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                             if (index > mqs.size()) {
-                                                throw new CanalServerException("partition number is error,config num:"
-                                                                               + destination.getPartitionsNum()
-                                                                               + ", mq num: " + mqs.size());
+                                                return mqs.get(index % mqs.size());
+                                            } else {
+                                                return mqs.get(index);
                                             }
-                                            return mqs.get(index);
                                         }
                                     }, null);
                                 } catch (Exception e) {
@@ -196,7 +198,11 @@ public class CanalRocketMQProducer implements CanalMQProducer {
 
                                 @Override
                                 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                                    return mqs.get(partition);
+                                    if (partition > mqs.size()) {
+                                        return mqs.get(partition % mqs.size());
+                                    } else {
+                                        return mqs.get(partition);
+                                    }
                                 }
                             }, null);
                         } catch (Exception e) {