Bladeren bron

fixed issue #4940 , support percona 8.0.33

jianghang.loujh 1 jaar geleden
bovenliggende
commit
b4074206ca

+ 10 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java

@@ -5,7 +5,6 @@ import java.util.Map;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
 
 /**
@@ -28,6 +27,8 @@ public final class LogContext {
 
     private boolean                           iterateDecode = false;
 
+    private boolean                           compatiablePercona = false;
+
     public LogContext(){
         this.formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;
     }
@@ -92,4 +93,12 @@ public final class LogContext {
     public void setIterateDecode(boolean iterateDecode) {
         this.iterateDecode = iterateDecode;
     }
+
+    public boolean isCompatiablePercona() {
+        return compatiablePercona;
+    }
+
+    public void setCompatiablePercona(boolean compatiablePercona) {
+        this.compatiablePercona = compatiablePercona;
+    }
 }

+ 8 - 4
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -158,8 +158,6 @@ public final class LogDecoder {
             } finally {
                 context.setIterateDecode(false);
             }
-        } else {
-            // TODO support mariadb compress binlog
         }
         return events;
     }
@@ -193,7 +191,10 @@ public final class LogDecoder {
         LogEvent gtidLogEvent = context.getGtidLogEvent();
         switch (header.getType()) {
             case LogEvent.QUERY_EVENT: {
-                QueryLogEvent event = new QueryLogEvent(header, buffer, descriptionEvent);
+                QueryLogEvent event = new QueryLogEvent(header,
+                    buffer,
+                    descriptionEvent,
+                    context.isCompatiablePercona());
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 header.putGtid(context.getGtidSet(), gtidLogEvent);
@@ -354,7 +355,10 @@ public final class LogDecoder {
                 return event;
             }
             case LogEvent.EXECUTE_LOAD_QUERY_EVENT: {
-                ExecuteLoadQueryLogEvent event = new ExecuteLoadQueryLogEvent(header, buffer, descriptionEvent);
+                ExecuteLoadQueryLogEvent event = new ExecuteLoadQueryLogEvent(header,
+                    buffer,
+                    descriptionEvent,
+                    context.isCompatiablePercona());
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 return event;

+ 3 - 3
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/ExecuteLoadQueryLogEvent.java

@@ -52,9 +52,9 @@ public final class ExecuteLoadQueryLogEvent extends QueryLogEvent {
     public static final int ELQ_FN_POS_END_OFFSET   = ELQ_FILE_ID_OFFSET + 8;
     public static final int ELQ_DUP_HANDLING_OFFSET = ELQ_FILE_ID_OFFSET + 12;
 
-    public ExecuteLoadQueryLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent)
-                                                                                                                   throws IOException{
-        super(header, buffer, descriptionEvent);
+    public ExecuteLoadQueryLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
+                                    boolean compatiablePercona) throws IOException{
+        super(header, buffer, descriptionEvent, compatiablePercona);
 
         buffer.position(descriptionEvent.commonHeaderLen + ELQ_FILE_ID_OFFSET);
 

+ 48 - 4
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/QueryLogEvent.java

@@ -434,8 +434,8 @@ public class QueryLogEvent extends LogEvent {
     private int             tvSec                     = -1;
     private BigInteger      ddlXid                    = BigInteger.valueOf(-1L);
     private Charset         charset;
-
     private String          timezone;
+    private boolean         compatiablePercona        = false;
 
     public QueryLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent)
                                                                                                          throws IOException{
@@ -443,8 +443,15 @@ public class QueryLogEvent extends LogEvent {
     }
 
     public QueryLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
+                         boolean compatiablePercona) throws IOException{
+        this(header, buffer, descriptionEvent, compatiablePercona, false);
+    }
+
+    public QueryLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
+                         boolean compatiablePercona,
                          boolean compress) throws IOException{
         super(header);
+        this.compatiablePercona = compatiablePercona;
         final int commonHeaderLen = descriptionEvent.commonHeaderLen;
         final int postHeaderLen = descriptionEvent.postHeaderLen[header.type - 1];
         /*
@@ -607,6 +614,21 @@ public class QueryLogEvent extends LogEvent {
      */
     public static final int Q_SQL_REQUIRE_PRIMARY_KEY         = 19;
 
+    /**
+     * Replicate default_table_encryption.
+     */
+    public static final int Q_DEFAULT_TABLE_ENCRYPTION        = 20;
+
+    /**
+     * @since percona 8.0.31 Replicate ddl_skip_rewrite.
+     */
+    public static final int Q_DDL_SKIP_REWRITE                = 21;
+
+    /**
+     * @since percona 8.0.31 Replicate Q_WSREP_SKIP_READONLY_CHECKS.
+     */
+    public static final int Q_WSREP_SKIP_READONLY_CHECKS      = 128;
+
     /**
      * FROM MariaDB 5.5.34
      */
@@ -713,9 +735,24 @@ public class QueryLogEvent extends LogEvent {
                         // *start++ = thd->variables.sql_require_primary_key;
                         buffer.forward(1);
                         break;
+                    case Q_DEFAULT_TABLE_ENCRYPTION:
+                        // *start++ = thd->variables.default_table_encryption;
+                        buffer.forward(1);
+                    case Q_DDL_SKIP_REWRITE:
+                        // *start++ = thd->variables.binlog_ddl_skip_rewrite;
+                        buffer.forward(1);
                     case Q_HRNOW:
-                        // int when_sec_part = buffer.getUint24();
-                        buffer.forward(3);
+                        // https://github.com/alibaba/canal/issues/4940
+                        // percona 和 mariadb各自扩展mysql binlog的格式后有冲突
+                        // 需要精确识别一下数据库类型做兼容处理
+                        if (compatiablePercona) {
+                            // percona 8.0.31
+                            // Q_WSREP_SKIP_READONLY_CHECKS *start++ = 1;
+                            buffer.forward(1);
+                        } else {
+                            // int when_sec_part = buffer.getUint24();
+                            buffer.forward(3);
+                        }
                         break;
                     case Q_XID:
                         // xid= uint8korr(pos);
@@ -728,7 +765,6 @@ public class QueryLogEvent extends LogEvent {
                         // sa_seq_no = uint8korr(pos);
                         // pos+= 8;
                         // }
-
                         int gtid_flags_extra = buffer.getUint8();
                         final int FL_COMMIT_ALTER_E1= 4;
                         final int FL_ROLLBACK_ALTER_E1= 8;
@@ -789,8 +825,16 @@ public class QueryLogEvent extends LogEvent {
                 return "Q_DEFAULT_COLLATION_FOR_UTF8MB4";
             case Q_SQL_REQUIRE_PRIMARY_KEY:
                 return "Q_SQL_REQUIRE_PRIMARY_KEY";
+            case Q_DEFAULT_TABLE_ENCRYPTION:
+                return "Q_DEFAULT_TABLE_ENCRYPTION";
+            case Q_DDL_SKIP_REWRITE:
+                return "Q_DDL_SKIP_REWRITE";
             case Q_HRNOW:
                 return "Q_HRNOW";
+            case Q_XID:
+                return "Q_XID";
+            case Q_GTID_FLAGS3:
+                return "Q_GTID_FLAGS3";
         }
         return "CODE#" + code;
     }

+ 1 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/QueryCompressedLogEvent.java

@@ -1 +1 @@
-package com.taobao.tddl.dbsync.binlog.event.mariadb;

import java.io.IOException;

import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
import com.taobao.tddl.dbsync.binlog.event.LogHeader;
import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;

/**
 * mariadb compress query event
 * 
 * @author jianghang
 * @since 1.1.7
 */
public class QueryCompressedLogEvent extends QueryLogEvent {

    public QueryCompressedLogEvent(LogHeader header, LogBuffer buffer,
                                   FormatDescriptionLogEvent descriptionEvent) throws IOException{
        super(header, buffer, descriptionEvent, true);
    }
}
+package com.taobao.tddl.dbsync.binlog.event.mariadb;

import java.io.IOException;

import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
import com.taobao.tddl.dbsync.binlog.event.LogHeader;
import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;

/**
 * mariadb compress query event
 * 
 * @author jianghang
 * @since 1.1.7
 */
public class QueryCompressedLogEvent extends QueryLogEvent {

    public QueryCompressedLogEvent(LogHeader header, LogBuffer buffer,
                                   FormatDescriptionLogEvent descriptionEvent) throws IOException{
        super(header, buffer, descriptionEvent,false, true);
    }
}

+ 29 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -57,6 +57,7 @@ public class MysqlConnection implements ErosaConnection {
     private int                 binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
     // dump binlog bytes, 暂不包括meta与TSDB
     private AtomicLong          receivedBinlogBytes;
+    private boolean             compatiablePercona = false;
 
     public MysqlConnection(){
     }
@@ -122,6 +123,7 @@ public class MysqlConnection implements ErosaConnection {
     public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException {
         updateSettings();
         loadBinlogChecksum();
+        loadVersionComment();
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
@@ -146,6 +148,7 @@ public class MysqlConnection implements ErosaConnection {
             }
             context.setGtidSet(gtidSet);
         }
+        context.setCompatiablePercona(compatiablePercona);
         context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
             accumulateReceivedBytes(fetcher.limit());
@@ -165,12 +168,14 @@ public class MysqlConnection implements ErosaConnection {
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         updateSettings();
         loadBinlogChecksum();
+        loadVersionComment();
         sendRegisterSlave();
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
         LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
         LogContext context = new LogContext();
+        context.setCompatiablePercona(compatiablePercona);
         context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
             accumulateReceivedBytes(fetcher.limit());
@@ -195,12 +200,14 @@ public class MysqlConnection implements ErosaConnection {
     public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException {
         updateSettings();
         loadBinlogChecksum();
+        loadVersionComment();
         sendBinlogDumpGTID(gtidSet);
 
         try (DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize())) {
             fetcher.start(connector.getChannel());
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
+            context.setCompatiablePercona(compatiablePercona);
             context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
             // fix bug: #890 将gtid传输至context中,供decode使用
             context.setGtidSet(gtidSet);
@@ -238,10 +245,12 @@ public class MysqlConnection implements ErosaConnection {
     public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException {
         updateSettings();
         loadBinlogChecksum();
+        loadVersionComment();
         sendRegisterSlave();
         sendBinlogDump(binlogfilename, binlogPosition);
         ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
         ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
+        ((MysqlMultiStageCoprocessor) coprocessor).setCompatiablePercona(compatiablePercona);
         try (DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize())) {
             fetcher.start(connector.getChannel());
             while (fetcher.fetch()) {
@@ -264,9 +273,11 @@ public class MysqlConnection implements ErosaConnection {
     public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException {
         updateSettings();
         loadBinlogChecksum();
+        loadVersionComment();
         sendBinlogDumpGTID(gtidSet);
         ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
         ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
+        ((MysqlMultiStageCoprocessor) coprocessor).setCompatiablePercona(compatiablePercona);
         try (DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize())) {
             fetcher.start(connector.getChannel());
             while (fetcher.fetch()) {
@@ -558,6 +569,24 @@ public class MysqlConnection implements ErosaConnection {
         }
     }
 
+    /**
+     * 识别下mysql的几种生态版本 (percona / mariadb / mysql)
+     */
+    private void loadVersionComment() {
+        ResultSetPacket rs = null;
+        try {
+            rs = query("select @@version_comment");
+            List<String> columnValues = rs.getFieldValues();
+            if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0) != null){
+                if(StringUtils.containsIgnoreCase(columnValues.get(0),"Percona")){
+                    compatiablePercona = true;
+                }
+            }
+        } catch (Throwable e) {
+            compatiablePercona = false;
+        }
+    }
+
     private void accumulateReceivedBytes(long x) {
         if (receivedBinlogBytes != null) {
             receivedBinlogBytes.addAndGet(x);

+ 4 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -130,6 +130,10 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         }
     }
 
+    public void setCompatiablePercona(boolean compatiablePercona) {
+        logContext.setCompatiablePercona(compatiablePercona);
+    }
+
     @Override
     public void stop() {
         // fix bug #968,对于pool与