Browse Source

fixed schema blackFilter & mysql 8.4

jianghang.loujh 8 months ago
parent
commit
1e5b8a2021

+ 11 - 18
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -30,18 +30,17 @@ import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
  */
 public class MysqlConnector {
 
+    public static final int     timeout           = 5 * 1000;                                     // 5s
     private static final Logger logger            = LoggerFactory.getLogger(MysqlConnector.class);
     private InetSocketAddress   address;
     private String              username;
     private String              password;
     private SslInfo             sslInfo;
-
     private String              defaultSchema;
     private int                 soTimeout         = 30 * 1000;
     private int                 connTimeout       = 5 * 1000;
     private int                 receiveBufferSize = 16 * 1024;
     private int                 sendBufferSize    = 16 * 1024;
-
     private SocketChannel       channel;
     private volatile boolean    dumping           = false;
     // mysql connectionId
@@ -50,8 +49,6 @@ public class MysqlConnector {
     // serverVersion
     private String              serverVersion;
 
-    public static final int     timeout           = 5 * 1000;                                     // 5s
-
     public MysqlConnector(){
     }
 
@@ -200,16 +197,13 @@ public class MysqlConnector {
         }
         HandshakeInitializationPacket handshakePacket = new HandshakeInitializationPacket();
         handshakePacket.fromBytes(body);
-        byte serverCharsetNumber
-            = (handshakePacket.serverCharsetNumber != 0)
-            ? handshakePacket.serverCharsetNumber
-            : 33;
+        byte serverCharsetNumber = (handshakePacket.serverCharsetNumber != 0) ? handshakePacket.serverCharsetNumber : 33;
         SslMode sslMode = sslInfo != null ? sslInfo.getSslMode() : SslMode.DISABLED;
         if (sslMode != SslMode.DISABLED) {
             boolean serverSupportSsl = (handshakePacket.serverCapabilities & CLIENT_SSL) > 0;
             if (!serverSupportSsl) {
-                throw new IOException("MySQL Server does not support SSL: " + address
-                    + " serverCapabilities: " + handshakePacket.serverCapabilities);
+                throw new IOException("MySQL Server does not support SSL: " + address + " serverCapabilities: "
+                                      + handshakePacket.serverCapabilities);
             }
             byte[] sslPacket = new SslRequestCommandPacket(serverCharsetNumber).toBytes();
             HeaderPacket sslHeader = new HeaderPacket();
@@ -230,10 +224,9 @@ public class MysqlConnector {
         serverVersion = handshakePacket.serverVersion; // 记录serverVersion
         logger.info("handshake initialization packet received, prepare the client authentication packet to send");
         // 某些老协议的 server 默认不返回 auth plugin,需要使用默认的 mysql_native_password
-        String authPluginName
-            = (handshakePacket.authPluginName != null && handshakePacket.authPluginName.length > 0)
-            ? new String(handshakePacket.authPluginName)
-            : "mysql_native_password";
+        String authPluginName = (handshakePacket.authPluginName != null
+                                 && handshakePacket.authPluginName.length > 0) ? new String(
+                                     handshakePacket.authPluginName) : "mysql_native_password";
         logger.info("auth plugin: {}", authPluginName);
         boolean isSha2Password = false;
         ClientAuthenticationPacket clientAuth;
@@ -490,10 +483,6 @@ public class MysqlConnector {
         this.channel = channel;
     }
 
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
     public long getConnectionId() {
         return connectionId;
     }
@@ -522,6 +511,10 @@ public class MysqlConnector {
         return password;
     }
 
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
     public String getServerVersion() {
         return serverVersion;
     }

+ 29 - 22
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -432,7 +432,7 @@ public class MysqlConnection implements ErosaConnection {
      * <li>net_write_timeout</li>
      * <li>net_read_timeout</li>
      * </ol>
-     *
+     * 
      * @throws IOException
      */
     private void updateSettings() throws IOException {
@@ -560,7 +560,7 @@ public class MysqlConnection implements ErosaConnection {
 
     /**
      * 获取主库checksum信息
-     *
+     * 
      * <pre>
      * mariadb区别于mysql会在binlog的第一个事件Rotate_Event里也会采用checksum逻辑,而mysql是在第二个binlog事件之后才感知是否需要处理checksum
      * 导致maraidb只要是开启checksum就会出现binlog文件名解析乱码
@@ -603,6 +603,32 @@ public class MysqlConnection implements ErosaConnection {
         }
     }
 
+    /**
+     * MySQL 8.4版本开始部分命令出现变化
+     * https://dev.mysql.com/doc/relnotes/mysql/8.4/en/news-8-4-0.html#mysqld-8-4-0-deprecation-removal
+     * 
+     * @param major
+     * @param minor
+     * @return
+     */
+    public boolean atLeast(int major, int minor) {
+        if (isMariaDB()) {
+            return false;
+        }
+        String version = connector.getServerVersion();
+        if (StringUtils.isNotEmpty(version)) {
+            String[] parts = version.split("\\.");
+            int majorVer = Integer.parseInt(parts[0]);
+            int minorVer = Integer.parseInt(parts[1]);
+            return (majorVer > major) || (majorVer == major && minorVer >= minor);
+        }
+        return false;
+    }
+
+    public boolean atLeastMySQL84() {
+        return atLeast(8, 4);
+    }
+
     private void accumulateReceivedBytes(long x) {
         if (receivedBinlogBytes != null) {
             receivedBinlogBytes.addAndGet(x);
@@ -645,7 +671,7 @@ public class MysqlConnection implements ErosaConnection {
     /**
      * http://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.
      * html#sysvar_binlog_row_image
-     *
+     * 
      * @author agapple 2015年6月29日 下午10:39:03
      * @since 1.0.20
      */
@@ -756,23 +782,4 @@ public class MysqlConnection implements ErosaConnection {
         return connector.getServerVersion() != null && connector.getServerVersion().toLowerCase().contains("mariadb");
     }
 
-    // MySQL 8.4版本开始部分命令出现变化
-    // https://dev.mysql.com/doc/relnotes/mysql/8.4/en/news-8-4-0.html#mysqld-8-4-0-deprecation-removal
-    public boolean atLeast(int major, int minor) {
-        if (isMariaDB()) {
-            return false;
-        }
-        String version = connector.getServerVersion();
-        if (StringUtils.isNotEmpty(version)) {
-            String[] parts = version.split("\\.");
-            int majorVer = Integer.parseInt(parts[0]);
-            int minorVer = Integer.parseInt(parts[1]);
-            return (majorVer > major) || (majorVer == major && minorVer >= minor);
-        }
-        return false;
-    }
-
-    public boolean atLeastMySQL84() {
-        return atLeast(8, 4);
-    }
 }

+ 13 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -668,20 +668,20 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
      * 查询当前的binlog位置
      */
     private EntryPosition findEndPosition(MysqlConnection mysqlConnection) {
-        String showSql = "SHOW MASTER STATUS";
+        String showSql = "show master status";
         try {
             if (mysqlConnection.atLeastMySQL84()) {
-                showSql = "SHOW BINARY LOG STATUS";
-            }
-            if (multiStreamEnable) {
+                // 8.4新语法
+                showSql = "show binary log status";
+            } else if (multiStreamEnable) {
+                // 兼容polardb-x的多流binlog
                 showSql = "show master status with " + destination;
             }
-
             ResultSetPacket packet = mysqlConnection.query(showSql);
             List<String> fields = packet.getFieldValues();
             if (CollectionUtils.isEmpty(fields)) {
                 throw new CanalParseException(
-                        "command : '" + showSql +"' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
+                    "command : '" + showSql + "' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
             }
             EntryPosition endPosition = new EntryPosition(fields.get(0), Long.valueOf(fields.get(1)));
             if (isGTIDMode() && fields.size() > 4) {
@@ -697,7 +697,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
             }
             return endPosition;
         } catch (IOException e) {
-            throw new CanalParseException("command : '" + showSql +"' has an error! ", e);
+            throw new CanalParseException("command : '" + showSql + "' has an error!", e);
         }
     }
 
@@ -728,7 +728,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     @SuppressWarnings("unused")
     private SlaveEntryPosition findSlavePosition(MysqlConnection mysqlConnection) {
         try {
-            ResultSetPacket packet = mysqlConnection.query("show slave status");
+            String showSql = "show slave stauts";
+            if (mysqlConnection.atLeastMySQL84()) {
+                // 兼容mysql 8.4
+                showSql = "show replica status";
+            }
+            ResultSetPacket packet = mysqlConnection.query(showSql);
             List<FieldPacket> names = packet.getFieldDescriptors();
             List<String> fields = packet.getFieldValues();
             if (CollectionUtils.isEmpty(fields)) {

+ 5 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -207,6 +207,11 @@ public class DatabaseTableMeta implements TableMetaTSDB {
             }
 
             for (String schema : schemas) {
+                // 如果schema命中黑名单,直接跳过
+                // 解决部分数据库可以看到database,但实际表级别无权限的情况
+                if (blackFilter != null && blackFilter.filter(schema + ".%")) {
+                    continue;
+                }
                 // filter views
                 packet = connection.query("show full tables from `" + schema + "` where Table_type = 'BASE TABLE'");
                 columnSize = packet.getFieldDescriptors().size();