浏览代码

support mysql 8.4 (#5231)

TK337 8 月之前
父节点
当前提交
7e31b40531

+ 1 - 1
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -293,7 +293,7 @@ public class MysqlConnector {
                     encryptedPassword = getPassword().getBytes();
                     encryptedPassword = getPassword().getBytes();
                     header = authSwitchAfterAuth(encryptedPassword, header);
                     header = authSwitchAfterAuth(encryptedPassword, header);
                     body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
                     body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
-                } else if ("mysql_native_password".equals(pluginName)) {
+                } else if (pluginName == null || "mysql_native_password".equals(pluginName)) {
                     try {
                     try {
                         encryptedPassword = MySQLPasswordEncrypter.scramble411(getPassword().getBytes(), authData);
                         encryptedPassword = MySQLPasswordEncrypter.scramble411(getPassword().getBytes(), authData);
                     } catch (NoSuchAlgorithmException e) {
                     } catch (NoSuchAlgorithmException e) {

+ 24 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -432,7 +432,7 @@ public class MysqlConnection implements ErosaConnection {
      * <li>net_write_timeout</li>
      * <li>net_write_timeout</li>
      * <li>net_read_timeout</li>
      * <li>net_read_timeout</li>
      * </ol>
      * </ol>
-     * 
+     *
      * @throws IOException
      * @throws IOException
      */
      */
     private void updateSettings() throws IOException {
     private void updateSettings() throws IOException {
@@ -560,7 +560,7 @@ public class MysqlConnection implements ErosaConnection {
 
 
     /**
     /**
      * 获取主库checksum信息
      * 获取主库checksum信息
-     * 
+     *
      * <pre>
      * <pre>
      * mariadb区别于mysql会在binlog的第一个事件Rotate_Event里也会采用checksum逻辑,而mysql是在第二个binlog事件之后才感知是否需要处理checksum
      * mariadb区别于mysql会在binlog的第一个事件Rotate_Event里也会采用checksum逻辑,而mysql是在第二个binlog事件之后才感知是否需要处理checksum
      * 导致maraidb只要是开启checksum就会出现binlog文件名解析乱码
      * 导致maraidb只要是开启checksum就会出现binlog文件名解析乱码
@@ -627,7 +627,7 @@ public class MysqlConnection implements ErosaConnection {
 
 
         private String value;
         private String value;
 
 
-        private BinlogFormat(String value){
+        private BinlogFormat(String value) {
             this.value = value;
             this.value = value;
         }
         }
 
 
@@ -645,7 +645,7 @@ public class MysqlConnection implements ErosaConnection {
     /**
     /**
      * http://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.
      * http://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.
      * html#sysvar_binlog_row_image
      * html#sysvar_binlog_row_image
-     * 
+     *
      * @author agapple 2015年6月29日 下午10:39:03
      * @author agapple 2015年6月29日 下午10:39:03
      * @since 1.0.20
      * @since 1.0.20
      */
      */
@@ -667,7 +667,7 @@ public class MysqlConnection implements ErosaConnection {
 
 
         private String value;
         private String value;
 
 
-        private BinlogImage(String value){
+        private BinlogImage(String value) {
             this.value = value;
             this.value = value;
         }
         }
 
 
@@ -756,4 +756,23 @@ public class MysqlConnection implements ErosaConnection {
         return connector.getServerVersion() != null && connector.getServerVersion().toLowerCase().contains("mariadb");
         return connector.getServerVersion() != null && connector.getServerVersion().toLowerCase().contains("mariadb");
     }
     }
 
 
+    // MySQL 8.4版本开始部分命令出现变化
+    // https://dev.mysql.com/doc/relnotes/mysql/8.4/en/news-8-4-0.html#mysqld-8-4-0-deprecation-removal
+    public boolean atLeast(int major, int minor) {
+        if (isMariaDB()) {
+            return false;
+        }
+        String version = connector.getServerVersion();
+        if (StringUtils.isNotEmpty(version)) {
+            String[] parts = version.split("\\.");
+            int majorVer = Integer.parseInt(parts[0]);
+            int minorVer = Integer.parseInt(parts[1]);
+            return (majorVer > major) || (majorVer == major && minorVer >= minor);
+        }
+        return false;
+    }
+
+    public boolean atLeastMySQL84() {
+        return atLeast(8, 4);
+    }
 }
 }

+ 10 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -668,13 +668,20 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
      * 查询当前的binlog位置
      * 查询当前的binlog位置
      */
      */
     private EntryPosition findEndPosition(MysqlConnection mysqlConnection) {
     private EntryPosition findEndPosition(MysqlConnection mysqlConnection) {
+        String showSql = "SHOW MASTER STATUS";
         try {
         try {
-            String showSql = multiStreamEnable ? "show master status with " + destination : "show master status";
+            if (mysqlConnection.atLeastMySQL84()) {
+                showSql = "SHOW BINARY LOG STATUS";
+            }
+            if (multiStreamEnable) {
+                showSql = "show master status with " + destination;
+            }
+
             ResultSetPacket packet = mysqlConnection.query(showSql);
             ResultSetPacket packet = mysqlConnection.query(showSql);
             List<String> fields = packet.getFieldValues();
             List<String> fields = packet.getFieldValues();
             if (CollectionUtils.isEmpty(fields)) {
             if (CollectionUtils.isEmpty(fields)) {
                 throw new CanalParseException(
                 throw new CanalParseException(
-                        "command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
+                        "command : '" + showSql +"' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
             }
             }
             EntryPosition endPosition = new EntryPosition(fields.get(0), Long.valueOf(fields.get(1)));
             EntryPosition endPosition = new EntryPosition(fields.get(0), Long.valueOf(fields.get(1)));
             if (isGTIDMode() && fields.size() > 4) {
             if (isGTIDMode() && fields.size() > 4) {
@@ -690,7 +697,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
             }
             }
             return endPosition;
             return endPosition;
         } catch (IOException e) {
         } catch (IOException e) {
-            throw new CanalParseException("command : 'show master status' has an error!", e);
+            throw new CanalParseException("command : '" + showSql +"' has an error! ", e);
         }
         }
     }
     }