Browse Source

fixed issue #136 , support rds no primary key && mysql.ha_health_check

agapple 8 years ago
parent
commit
96dca0b9e6

+ 1 - 1
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -104,7 +104,7 @@ public class MysqlConnector {
                     executor.update("KILL CONNECTION " + connectionId);
                     executor.update("KILL CONNECTION " + connectionId);
                 } catch (Exception e) {
                 } catch (Exception e) {
                     // 忽略具体异常
                     // 忽略具体异常
-                    logger.warn("KILL DUMP " + connectionId + " failure:" + ExceptionUtils.getStackTrace(e));
+                    logger.info("KILL DUMP " + connectionId + " failure:" + ExceptionUtils.getStackTrace(e));
                 } finally {
                 } finally {
                     if (connector != null) {
                     if (connector != null) {
                         connector.disconnect();
                         connector.disconnect();

+ 12 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.parse.inbound;
 package com.alibaba.otter.canal.parse.inbound;
 
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
@@ -44,6 +45,17 @@ public class TableMeta {
         this.fileds = fileds;
         this.fileds = fileds;
     }
     }
 
 
+    public List<FieldMeta> getPrimaryFields() {
+        List<FieldMeta> primarys = new ArrayList<TableMeta.FieldMeta>();
+        for (FieldMeta meta : fileds) {
+            if (meta.isKey()) {
+                primarys.add(meta);
+            }
+        }
+
+        return primarys;
+    }
+
     public static class FieldMeta {
     public static class FieldMeta {
 
 
         private String columnName;
         private String columnName;

+ 54 - 22
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -85,7 +85,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     // 是否跳过table相关的解析异常,比如表不存在或者列数量不匹配,issue 92
     // 是否跳过table相关的解析异常,比如表不存在或者列数量不匹配,issue 92
     private boolean                     filterTableError    = false;
     private boolean                     filterTableError    = false;
     // 新增rows过滤,用于仅订阅除rows以外的数据
     // 新增rows过滤,用于仅订阅除rows以外的数据
-    private boolean                     filterRows      = false;
+    private boolean                     filterRows          = false;
 
 
     public Entry parse(LogEvent logEvent) throws CanalParseException {
     public Entry parse(LogEvent logEvent) throws CanalParseException {
         if (logEvent == null || logEvent instanceof UnknownLogEvent) {
         if (logEvent == null || logEvent instanceof UnknownLogEvent) {
@@ -326,6 +326,11 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 return null;
                 return null;
             }
             }
 
 
+            if (tableMetaCache.isOnRDS() && "mysql.ha_health_check".equals(fullname)) {
+                // 忽略rds模式的mysql.ha_health_check心跳数据
+                return null;
+            }
+
             EventType eventType = null;
             EventType eventType = null;
             int type = event.getHeader().getType();
             int type = event.getHeader().getType();
             if (LogEvent.WRITE_ROWS_EVENT_V1 == type || LogEvent.WRITE_ROWS_EVENT == type) {
             if (LogEvent.WRITE_ROWS_EVENT_V1 == type || LogEvent.WRITE_ROWS_EVENT == type) {
@@ -402,34 +407,51 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
 
     private boolean parseOneRow(RowData.Builder rowDataBuilder, RowsLogEvent event, RowsLogBuffer buffer, BitSet cols,
     private boolean parseOneRow(RowData.Builder rowDataBuilder, RowsLogEvent event, RowsLogBuffer buffer, BitSet cols,
                                 boolean isAfter, TableMeta tableMeta) throws UnsupportedEncodingException {
                                 boolean isAfter, TableMeta tableMeta) throws UnsupportedEncodingException {
-        final int columnCnt = event.getTable().getColumnCnt();
-        final ColumnInfo[] columnInfo = event.getTable().getColumnInfo();
+        int columnCnt = event.getTable().getColumnCnt();
+        ColumnInfo[] columnInfo = event.getTable().getColumnInfo();
 
 
         boolean tableError = false;
         boolean tableError = false;
         // check table fileds count,只能处理加字段
         // check table fileds count,只能处理加字段
+        boolean existRDSNoPrimaryKey = false;
         if (tableMeta != null && columnInfo.length > tableMeta.getFileds().size()) {
         if (tableMeta != null && columnInfo.length > tableMeta.getFileds().size()) {
-            // online ddl增加字段操作步骤:
-            // 1. 新增一张临时表,将需要做ddl表的数据全量导入
-            // 2. 在老表上建立I/U/D的trigger,增量的将数据插入到临时表
-            // 3. 锁住应用请求,将临时表rename为老表的名字,完成增加字段的操作
-            // 尝试做一次reload,可能因为ddl没有正确解析,或者使用了类似online ddl的操作
-            // 因为online ddl没有对应表名的alter语法,所以不会有clear cache的操作
-            tableMeta = getTableMeta(event.getTable().getDbName(), event.getTable().getTableName(), false);// 强制重新获取一次
-            if (tableMeta == null) {
-                tableError = true;
-                if (!filterTableError) {
-                    throw new CanalParseException("not found [" + event.getTable().getDbName() + "."
-                                                  + event.getTable().getTableName() + "] in db , pls check!");
+            if (tableMetaCache.isOnRDS()) {
+                // 特殊处理下RDS的场景
+                List<FieldMeta> primaryKeys = tableMeta.getPrimaryFields();
+                if (primaryKeys == null || primaryKeys.isEmpty()) {
+                    if (columnInfo.length == tableMeta.getFileds().size() + 1
+                        && columnInfo[columnInfo.length - 1].type == LogEvent.MYSQL_TYPE_LONGLONG) {
+                        existRDSNoPrimaryKey = true;
+                    }
                 }
                 }
             }
             }
 
 
-            // 在做一次判断
-            if (tableMeta != null && columnInfo.length > tableMeta.getFileds().size()) {
-                tableError = true;
-                if (!filterTableError) {
-                    throw new CanalParseException("column size is not match for table:" + tableMeta.getFullName() + ","
-                                                  + columnInfo.length + " vs " + tableMeta.getFileds().size());
+            if (!existRDSNoPrimaryKey) {
+                // online ddl增加字段操作步骤:
+                // 1. 新增一张临时表,将需要做ddl表的数据全量导入
+                // 2. 在老表上建立I/U/D的trigger,增量的将数据插入到临时表
+                // 3. 锁住应用请求,将临时表rename为老表的名字,完成增加字段的操作
+                // 尝试做一次reload,可能因为ddl没有正确解析,或者使用了类似online ddl的操作
+                // 因为online ddl没有对应表名的alter语法,所以不会有clear cache的操作
+                tableMeta = getTableMeta(event.getTable().getDbName(), event.getTable().getTableName(), false);// 强制重新获取一次
+                if (tableMeta == null) {
+                    tableError = true;
+                    if (!filterTableError) {
+                        throw new CanalParseException("not found [" + event.getTable().getDbName() + "."
+                                                      + event.getTable().getTableName() + "] in db , pls check!");
+                    }
+                }
+
+                // 在做一次判断
+                if (tableMeta != null && columnInfo.length > tableMeta.getFileds().size()) {
+                    tableError = true;
+                    if (!filterTableError) {
+                        throw new CanalParseException("column size is not match for table:" + tableMeta.getFullName()
+                                                      + "," + columnInfo.length + " vs " + tableMeta.getFileds().size());
+                    }
                 }
                 }
+            } else {
+                logger.warn("[" + event.getTable().getDbName() + "." + event.getTable().getTableName()
+                            + "] is no primary key , skip alibaba_rds_row_id column");
             }
             }
         }
         }
 
 
@@ -440,6 +462,12 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 continue;
                 continue;
             }
             }
 
 
+            if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
+                // 不解析最后一列
+                buffer.nextValue(info.type, info.meta, false);
+                continue;
+            }
+
             Column.Builder columnBuilder = Column.newBuilder();
             Column.Builder columnBuilder = Column.newBuilder();
 
 
             FieldMeta fieldMeta = null;
             FieldMeta fieldMeta = null;
@@ -465,6 +493,10 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 }
                 }
             }
             }
             buffer.nextValue(info.type, info.meta, isBinary);
             buffer.nextValue(info.type, info.meta, isBinary);
+            if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
+                // 不解析最后一列
+                continue;
+            }
 
 
             int javaType = buffer.getJavaType();
             int javaType = buffer.getJavaType();
             if (buffer.isNull()) {
             if (buffer.isNull()) {
@@ -718,7 +750,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     public void setFilterTableError(boolean filterTableError) {
     public void setFilterTableError(boolean filterTableError) {
         this.filterTableError = filterTableError;
         this.filterTableError = filterTableError;
     }
     }
-    
+
     public void setFilterRows(boolean filterRows) {
     public void setFilterRows(boolean filterRows) {
         this.filterRows = filterRows;
         this.filterRows = filterRows;
     }
     }

+ 17 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -32,6 +32,7 @@ public class TableMetaCache {
     public static final String     COLUMN_DEFAULT = "COLUMN_DEFAULT";
     public static final String     COLUMN_DEFAULT = "COLUMN_DEFAULT";
     public static final String     EXTRA          = "EXTRA";
     public static final String     EXTRA          = "EXTRA";
     private MysqlConnection        connection;
     private MysqlConnection        connection;
+    private boolean                isOnRDS        = false;
 
 
     // 第一层tableId,第二层schema.table,解决tableId重复,对应多张表
     // 第一层tableId,第二层schema.table,解决tableId重复,对应多张表
     private Map<String, TableMeta> tableMetaCache;
     private Map<String, TableMeta> tableMetaCache;
@@ -56,6 +57,13 @@ public class TableMetaCache {
 
 
         });
         });
 
 
+        try {
+            ResultSetPacket packet = connection.query("show global variables  like 'rds\\_%'");
+            if (packet.getFieldValues().size() > 0) {
+                isOnRDS = true;
+            }
+        } catch (IOException e) {
+        }
     }
     }
 
 
     public TableMeta getTableMeta(String schema, String table) {
     public TableMeta getTableMeta(String schema, String table) {
@@ -136,4 +144,13 @@ public class TableMetaCache {
             .append('`')
             .append('`')
             .toString();
             .toString();
     }
     }
+
+    public boolean isOnRDS() {
+        return isOnRDS;
+    }
+
+    public void setOnRDS(boolean isOnRDS) {
+        this.isOnRDS = isOnRDS;
+    }
+
 }
 }