Răsfoiți Sursa

ignore compression event

jianghang.loujh 2 ani în urmă
părinte
comite
ce916ee407

+ 49 - 12
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -197,8 +197,9 @@ public final class LogDecoder {
             }
             case LogEvent.SLAVE_EVENT: /* can never happen (unused event) */
             {
-                if (logger.isWarnEnabled()) logger.warn("Skipping unsupported SLAVE_EVENT from: "
-                                                        + context.getLogPosition());
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Skipping unsupported SLAVE_EVENT from: " + context.getLogPosition());
+                }
                 break;
             }
             case LogEvent.CREATE_FILE_EVENT: {
@@ -264,22 +265,25 @@ public final class LogDecoder {
                 return descriptionEvent;
             }
             case LogEvent.PRE_GA_WRITE_ROWS_EVENT: {
-                if (logger.isWarnEnabled()) logger.warn("Skipping unsupported PRE_GA_WRITE_ROWS_EVENT from: "
-                                                        + context.getLogPosition());
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Skipping unsupported PRE_GA_WRITE_ROWS_EVENT from: " + context.getLogPosition());
+                }
                 // ev = new Write_rows_log_event_old(buf, event_len,
                 // description_event);
                 break;
             }
             case LogEvent.PRE_GA_UPDATE_ROWS_EVENT: {
-                if (logger.isWarnEnabled()) logger.warn("Skipping unsupported PRE_GA_UPDATE_ROWS_EVENT from: "
-                                                        + context.getLogPosition());
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Skipping unsupported PRE_GA_UPDATE_ROWS_EVENT from: " + context.getLogPosition());
+                }
                 // ev = new Update_rows_log_event_old(buf, event_len,
                 // description_event);
                 break;
             }
             case LogEvent.PRE_GA_DELETE_ROWS_EVENT: {
-                if (logger.isWarnEnabled()) logger.warn("Skipping unsupported PRE_GA_DELETE_ROWS_EVENT from: "
-                                                        + context.getLogPosition());
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Skipping unsupported PRE_GA_DELETE_ROWS_EVENT from: " + context.getLogPosition());
+                }
                 // ev = new Delete_rows_log_event_old(buf, event_len,
                 // description_event);
                 break;
@@ -356,10 +360,16 @@ public final class LogDecoder {
                 return event;
             }
             case LogEvent.TRANSACTION_PAYLOAD_EVENT: {
-                TransactionPayloadLogEvent event = new TransactionPayloadLogEvent(header, buffer, descriptionEvent);
-                /* updating position in context */
-                logPosition.position = header.getLogPos();
-                return event;
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Skipping unsupported MySQL TRANSACTION_PAYLOAD_EVENT from: " + context.getLogPosition());
+                }
+                break;
+
+                // TransactionPayloadLogEvent event = new TransactionPayloadLogEvent(header,
+                // buffer, descriptionEvent);
+                // /* updating position in context */
+                // logPosition.position = header.getLogPos();
+                // return event;
             }
             case LogEvent.VIEW_CHANGE_EVENT: {
                 ViewChangeEvent event = new ViewChangeEvent(header, buffer, descriptionEvent);
@@ -424,6 +434,33 @@ public final class LogDecoder {
                 logPosition.position = header.getLogPos();
                 return event;
             }
+            case LogEvent.QUERY_COMPRESSED_EVENT: {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Skipping unsupported MaraiDB QUERY_COMPRESSED_EVENT from: " + context.getLogPosition());
+                }
+                break;
+            }
+            case LogEvent.WRITE_ROWS_COMPRESSED_EVENT_V1:
+            case LogEvent.WRITE_ROWS_COMPRESSED_EVENT: {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Skipping unsupported MaraiDB WRITE_ROWS_COMPRESSED_EVENT from: " + context.getLogPosition());
+                }
+                break;
+            }
+            case LogEvent.UPDATE_ROWS_COMPRESSED_EVENT_V1:
+            case LogEvent.UPDATE_ROWS_COMPRESSED_EVENT: {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Skipping unsupported MaraiDB UPDATE_ROWS_COMPRESSED_EVENT from: " + context.getLogPosition());
+                }
+                break;
+            }
+            case LogEvent.DELETE_ROWS_COMPRESSED_EVENT_V1:
+            case LogEvent.DELETE_ROWS_COMPRESSED_EVENT: {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Skipping unsupported MaraiDB DELETE_ROWS_COMPRESSED_EVENT from: " + context.getLogPosition());
+                }
+                break;
+            }
             default:
                 /*
                  * Create an object of Ignorable_log_event for unrecognized

+ 15 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogEvent.java

@@ -213,6 +213,21 @@ public abstract class LogEvent {
 
     public static final int    START_ENCRYPTION_EVENT                   = 164;
 
+    // mariadb 10.10.1
+    /*
+     * Compressed binlog event. Note that the order between WRITE/UPDATE/DELETE
+     * events is significant; this is so that we can convert from the compressed to
+     * the uncompressed event type with (type-WRITE_ROWS_COMPRESSED_EVENT +
+     * WRITE_ROWS_EVENT) and similar for _V1.
+     */
+    public static final int    QUERY_COMPRESSED_EVENT                   = 165;
+    public static final int    WRITE_ROWS_COMPRESSED_EVENT_V1           = 166;
+    public static final int    UPDATE_ROWS_COMPRESSED_EVENT_V1          = 167;
+    public static final int    DELETE_ROWS_COMPRESSED_EVENT_V1          = 168;
+    public static final int    WRITE_ROWS_COMPRESSED_EVENT              = 169;
+    public static final int    UPDATE_ROWS_COMPRESSED_EVENT             = 170;
+    public static final int    DELETE_ROWS_COMPRESSED_EVENT             = 171;
+
     /** end marker */
     public static final int    ENUM_END_EVENT                           = 165;