Kaynağa Gözat

fixed issue #1256, rds alibaba_rds_row_id

agapple 6 yıl önce
ebeveyn
işleme
7c63e81e01

+ 27 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -621,9 +621,10 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                                                       + "," + columnInfo.length + " vs " + tableMeta.getFields().size());
                     }
                 }
-            } else {
-                logger.warn("[" + event.getTable().getDbName() + "." + event.getTable().getTableName()
-                            + "] is no primary key , skip alibaba_rds_row_id column");
+                // } else {
+                // logger.warn("[" + event.getTable().getDbName() + "." +
+                // event.getTable().getTableName()
+                // + "] is no primary key , skip alibaba_rds_row_id column");
             }
         }
 
@@ -634,18 +635,35 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 continue;
             }
 
+            if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
+                // 不解析最后一列
+                String rdsRowIdColumnName = "#alibaba_rds_row_id#";
+                buffer.nextValue(rdsRowIdColumnName, i, info.type, info.meta, false);
+                Column.Builder columnBuilder = Column.newBuilder();
+                columnBuilder.setName(rdsRowIdColumnName);
+                columnBuilder.setIsKey(true);
+                columnBuilder.setMysqlType("bigint");
+                columnBuilder.setIndex(i);
+                columnBuilder.setIsNull(false);
+                Serializable value = buffer.getValue();
+                columnBuilder.setValue(value.toString());
+                columnBuilder.setSqlType(Types.BIGINT);
+                columnBuilder.setUpdated(false);
+
+                if (isAfter) {
+                    rowDataBuilder.addAfterColumns(columnBuilder.build());
+                } else {
+                    rowDataBuilder.addBeforeColumns(columnBuilder.build());
+                }
+                continue;
+            }
+
             FieldMeta fieldMeta = null;
             if (tableMeta != null && !tableError) {
                 // 处理file meta
                 fieldMeta = tableMeta.getFields().get(i);
             }
 
-            if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
-                // 不解析最后一列
-                buffer.nextValue(fieldMeta.getColumnName(), i, info.type, info.meta, false);
-                continue;
-            }
-
             if (fieldMeta != null && existOptionalMetaData && tableMetaCache.isOnTSDB()) {
                 // check column info
                 boolean check = StringUtils.equalsIgnoreCase(fieldMeta.getColumnName(), info.name);