|
@@ -6,6 +6,7 @@ import java.math.BigDecimal;
|
|
|
import java.math.BigInteger;
|
|
|
import java.nio.charset.Charset;
|
|
|
import java.sql.Types;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.BitSet;
|
|
|
import java.util.List;
|
|
|
|
|
@@ -21,7 +22,9 @@ import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
|
|
|
import com.alibaba.otter.canal.parse.inbound.BinlogParser;
|
|
|
import com.alibaba.otter.canal.parse.inbound.TableMeta;
|
|
|
import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;
|
|
|
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.SimpleDdlParser.DdlResult;
|
|
|
+import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DdlResult;
|
|
|
+import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DruidDdlParser;
|
|
|
+import com.alibaba.otter.canal.parse.inbound.mysql.ddl.SimpleDdlParser;
|
|
|
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
|
|
|
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
|
|
|
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
|
|
@@ -33,6 +36,7 @@ import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
|
|
|
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
|
|
|
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
|
|
|
import com.alibaba.otter.canal.protocol.CanalEntry.Type;
|
|
|
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
|
|
|
import com.google.protobuf.ByteString;
|
|
|
import com.taobao.tddl.dbsync.binlog.LogEvent;
|
|
|
import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
|
|
@@ -86,8 +90,10 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
private boolean filterTableError = false;
|
|
|
// 新增rows过滤,用于仅订阅除rows以外的数据
|
|
|
private boolean filterRows = false;
|
|
|
+ private boolean useDruidDdlFilter = true;
|
|
|
|
|
|
- public Entry parse(LogEvent logEvent) throws CanalParseException {
|
|
|
+ @Override
|
|
|
+ public Entry parse(LogEvent logEvent, boolean isSeek) throws CanalParseException {
|
|
|
if (logEvent == null || logEvent instanceof UnknownLogEvent) {
|
|
|
return null;
|
|
|
}
|
|
@@ -98,7 +104,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
binlogFileName = ((RotateLogEvent) logEvent).getFilename();
|
|
|
break;
|
|
|
case LogEvent.QUERY_EVENT:
|
|
|
- return parseQueryEvent((QueryLogEvent) logEvent);
|
|
|
+ return parseQueryEvent((QueryLogEvent) logEvent, isSeek);
|
|
|
case LogEvent.XID_EVENT:
|
|
|
return parseXidEvent((XidLogEvent) logEvent);
|
|
|
case LogEvent.TABLE_MAP_EVENT:
|
|
@@ -137,7 +143,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private Entry parseQueryEvent(QueryLogEvent event) {
|
|
|
+ private Entry parseQueryEvent(QueryLogEvent event, boolean isSeek) {
|
|
|
String queryString = event.getQuery();
|
|
|
if (StringUtils.endsWithIgnoreCase(queryString, BEGIN)) {
|
|
|
TransactionBegin transactionBegin = createTransactionBegin(event.getSessionId());
|
|
@@ -148,107 +154,139 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
|
|
|
return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
|
|
|
} else {
|
|
|
- // DDL语句处理
|
|
|
- DdlResult result = SimpleDdlParser.parse(queryString, event.getDbName());
|
|
|
-
|
|
|
- String schemaName = event.getDbName();
|
|
|
- if (StringUtils.isNotEmpty(result.getSchemaName())) {
|
|
|
- schemaName = result.getSchemaName();
|
|
|
- }
|
|
|
-
|
|
|
- String tableName = result.getTableName();
|
|
|
+ boolean notFilter = false;
|
|
|
EventType type = EventType.QUERY;
|
|
|
-
|
|
|
- // 更新下table meta cache
|
|
|
- if (tableMetaCache != null
|
|
|
- && (result.getType() == EventType.ALTER || result.getType() == EventType.ERASE || result.getType() == EventType.RENAME)) {
|
|
|
- for (DdlResult renameResult = result; renameResult != null; renameResult = renameResult.getRenameTableResult()) {
|
|
|
- String schemaName0 = event.getDbName(); // 防止rename语句后产生schema变更带来影响
|
|
|
- if (StringUtils.isNotEmpty(renameResult.getSchemaName())) {
|
|
|
- schemaName0 = renameResult.getSchemaName();
|
|
|
- }
|
|
|
-
|
|
|
- tableName = renameResult.getTableName();
|
|
|
- if (StringUtils.isNotEmpty(tableName)) {
|
|
|
- // 如果解析到了正确的表信息,则根据全名进行清除
|
|
|
- tableMetaCache.clearTableMeta(schemaName0, tableName);
|
|
|
- } else {
|
|
|
- // 如果无法解析正确的表信息,则根据schema进行清除
|
|
|
- tableMetaCache.clearTableMetaWithSchemaName(schemaName0);
|
|
|
+ String tableName = null;
|
|
|
+ String schemaName = null;
|
|
|
+ if (useDruidDdlFilter) {
|
|
|
+ List<DdlResult> results = DruidDdlParser.parse(queryString, event.getDbName());
|
|
|
+ for (DdlResult result : results) {
|
|
|
+ if (!processFilter(queryString, result)) {
|
|
|
+ // 只要有一个数据不进行过滤
|
|
|
+ notFilter = true;
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- // fixed issue https://github.com/alibaba/canal/issues/58
|
|
|
- if (result.getType() == EventType.ALTER || result.getType() == EventType.ERASE
|
|
|
- || result.getType() == EventType.CREATE || result.getType() == EventType.TRUNCATE
|
|
|
- || result.getType() == EventType.RENAME || result.getType() == EventType.CINDEX
|
|
|
- || result.getType() == EventType.DINDEX) { // 针对DDL类型
|
|
|
-
|
|
|
- if (filterQueryDdl) {
|
|
|
- return null;
|
|
|
+ if (results.size() > 0) {
|
|
|
+ // 如果针对多行的DDL,只能取第一条
|
|
|
+ type = results.get(0).getType();
|
|
|
+ schemaName = results.get(0).getSchemaName();
|
|
|
+ tableName = results.get(0).getTableName();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ DdlResult result = SimpleDdlParser.parse(queryString, event.getDbName());
|
|
|
+ if (!processFilter(queryString, result)) {
|
|
|
+ notFilter = true;
|
|
|
}
|
|
|
|
|
|
type = result.getType();
|
|
|
- if (StringUtils.isEmpty(tableName)
|
|
|
- || (result.getType() == EventType.RENAME && StringUtils.isEmpty(result.getOriTableName()))) {
|
|
|
- // 如果解析不出tableName,记录一下日志,方便bugfix,目前直接抛出异常,中断解析
|
|
|
- throw new CanalParseException("SimpleDdlParser process query failed. pls submit issue with this queryString: "
|
|
|
- + queryString + " , and DdlResult: " + result.toString());
|
|
|
- // return null;
|
|
|
- } else {
|
|
|
- // check name filter
|
|
|
- String name = schemaName + "." + tableName;
|
|
|
- if (nameFilter != null && !nameFilter.filter(name)) {
|
|
|
- if (result.getType() == EventType.RENAME) {
|
|
|
- // rename校验只要源和目标满足一个就进行操作
|
|
|
- if (nameFilter != null
|
|
|
- && !nameFilter.filter(result.getOriSchemaName() + "." + result.getOriTableName())) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- } else {
|
|
|
- // 其他情况返回null
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
+ schemaName = result.getSchemaName();
|
|
|
+ tableName = result.getTableName();
|
|
|
+ }
|
|
|
|
|
|
- if (nameBlackFilter != null && nameBlackFilter.filter(name)) {
|
|
|
- if (result.getType() == EventType.RENAME) {
|
|
|
- // rename校验只要源和目标满足一个就进行操作
|
|
|
- if (nameBlackFilter != null
|
|
|
- && nameBlackFilter.filter(result.getOriSchemaName() + "." + result.getOriTableName())) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- } else {
|
|
|
- // 其他情况返回null
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } else if (result.getType() == EventType.INSERT || result.getType() == EventType.UPDATE
|
|
|
- || result.getType() == EventType.DELETE) {
|
|
|
- // 对外返回,保证兼容,还是返回QUERY类型,这里暂不解析tableName,所以无法支持过滤
|
|
|
- if (filterQueryDml) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- } else if (filterQueryDcl) {
|
|
|
+ if (!notFilter) {
|
|
|
+ // 如果是过滤的数据就不处理了
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ if (!isSeek) {
|
|
|
+ // 使用新的表结构元数据管理方式
|
|
|
+ EntryPosition position = createPosition(event.getHeader());
|
|
|
+ tableMetaCache.apply(position, event.getDbName(), queryString, null);
|
|
|
+ }
|
|
|
+
|
|
|
Header header = createHeader(binlogFileName, event.getHeader(), schemaName, tableName, type);
|
|
|
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
|
|
|
- if (result.getType() != EventType.QUERY) {
|
|
|
+ if (type != EventType.QUERY) {
|
|
|
rowChangeBuider.setIsDdl(true);
|
|
|
}
|
|
|
rowChangeBuider.setSql(queryString);
|
|
|
if (StringUtils.isNotEmpty(event.getDbName())) {// 可能为空
|
|
|
rowChangeBuider.setDdlSchemaName(event.getDbName());
|
|
|
}
|
|
|
- rowChangeBuider.setEventType(result.getType());
|
|
|
+ rowChangeBuider.setEventType(type);
|
|
|
return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private boolean processFilter(String queryString, DdlResult result) {
|
|
|
+ String schemaName = result.getSchemaName();
|
|
|
+ String tableName = result.getTableName();
|
|
|
+ // fixed issue https://github.com/alibaba/canal/issues/58
|
|
|
+ // 更新下table meta cache
|
|
|
+ if (tableMetaCache != null
|
|
|
+ && (result.getType() == EventType.ALTER || result.getType() == EventType.ERASE || result.getType() == EventType.RENAME)) {
|
|
|
+ // 对外返回,保证兼容,还是返回QUERY类型,这里暂不解析tableName,所以无法支持过滤
|
|
|
+ for (DdlResult renameResult = result; renameResult != null; renameResult = renameResult.getRenameTableResult()) {
|
|
|
+ String schemaName0 = renameResult.getSchemaName();
|
|
|
+ String tableName0 = renameResult.getTableName();
|
|
|
+ if (StringUtils.isNotEmpty(tableName0)) {
|
|
|
+ // 如果解析到了正确的表信息,则根据全名进行清除
|
|
|
+ tableMetaCache.clearTableMeta(schemaName0, tableName0);
|
|
|
+ } else {
|
|
|
+ // 如果无法解析正确的表信息,则根据schema进行清除
|
|
|
+ tableMetaCache.clearTableMetaWithSchemaName(schemaName0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // fixed issue https://github.com/alibaba/canal/issues/58
|
|
|
+ if (result.getType() == EventType.ALTER || result.getType() == EventType.ERASE
|
|
|
+ || result.getType() == EventType.CREATE || result.getType() == EventType.TRUNCATE
|
|
|
+ || result.getType() == EventType.RENAME || result.getType() == EventType.CINDEX
|
|
|
+ || result.getType() == EventType.DINDEX) { // 针对DDL类型
|
|
|
+
|
|
|
+ if (filterQueryDdl) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (StringUtils.isEmpty(tableName)
|
|
|
+ || (result.getType() == EventType.RENAME && StringUtils.isEmpty(result.getOriTableName()))) {
|
|
|
+ // 如果解析不出tableName,记录一下日志,方便bugfix,目前直接抛出异常,中断解析
|
|
|
+ throw new CanalParseException("SimpleDdlParser process query failed. pls submit issue with this queryString: "
|
|
|
+ + queryString + " , and DdlResult: " + result.toString());
|
|
|
+ // return null;
|
|
|
+ } else {
|
|
|
+ // check name filter
|
|
|
+ String name = schemaName + "." + tableName;
|
|
|
+ if (nameFilter != null && !nameFilter.filter(name)) {
|
|
|
+ if (result.getType() == EventType.RENAME) {
|
|
|
+ // rename校验只要源和目标满足一个就进行操作
|
|
|
+ if (nameFilter != null
|
|
|
+ && !nameFilter.filter(result.getOriSchemaName() + "." + result.getOriTableName())) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 其他情况返回null
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (nameBlackFilter != null && nameBlackFilter.filter(name)) {
|
|
|
+ if (result.getType() == EventType.RENAME) {
|
|
|
+ // rename校验只要源和目标满足一个就进行操作
|
|
|
+ if (nameBlackFilter != null
|
|
|
+ && nameBlackFilter.filter(result.getOriSchemaName() + "." + result.getOriTableName())) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 其他情况返回null
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else if (result.getType() == EventType.INSERT || result.getType() == EventType.UPDATE
|
|
|
+ || result.getType() == EventType.DELETE) {
|
|
|
+ // 对外返回,保证兼容,还是返回QUERY类型,这里暂不解析tableName,所以无法支持过滤
|
|
|
+ if (filterQueryDml) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ } else if (filterQueryDcl) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
private Entry parseRowsQueryEvent(RowsQueryLogEvent event) {
|
|
|
if (filterQueryDml) {
|
|
|
return null;
|
|
@@ -257,7 +295,15 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
String queryString = null;
|
|
|
try {
|
|
|
queryString = new String(event.getRowsQuery().getBytes(ISO_8859_1), charset.name());
|
|
|
- return buildQueryEntry(queryString, event.getHeader());
|
|
|
+ String tableName = null;
|
|
|
+ if (useDruidDdlFilter) {
|
|
|
+ List<DdlResult> results = DruidDdlParser.parse(queryString, null);
|
|
|
+ if (results.size() > 0) {
|
|
|
+ tableName = results.get(0).getTableName();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return buildQueryEntry(queryString, event.getHeader(), tableName);
|
|
|
} catch (UnsupportedEncodingException e) {
|
|
|
throw new CanalParseException(e);
|
|
|
}
|
|
@@ -318,6 +364,10 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
throw new TableIdNotFoundException("not found tableId:" + event.getTableId());
|
|
|
}
|
|
|
|
|
|
+ boolean isHeartBeat = isAliSQLHeartBeat(table.getDbName(), table.getTableName());
|
|
|
+ boolean isRDSHeartBeat = tableMetaCache.isOnRDS()
|
|
|
+ && isRDSHeartBeat(table.getDbName(), table.getTableName());
|
|
|
+
|
|
|
String fullname = table.getDbName() + "." + table.getTableName();
|
|
|
// check name filter
|
|
|
if (nameFilter != null && !nameFilter.filter(fullname)) {
|
|
@@ -327,9 +377,23 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- if (tableMetaCache.isOnRDS() && "mysql.ha_health_check".equals(fullname)) {
|
|
|
- // 忽略rds模式的mysql.ha_health_check心跳数据
|
|
|
- return null;
|
|
|
+ // if (isHeartBeat || isRDSHeartBeat) {
|
|
|
+ // // 忽略rds模式的mysql.ha_health_check心跳数据
|
|
|
+ // return null;
|
|
|
+ // }
|
|
|
+ TableMeta tableMeta = null;
|
|
|
+ if (isRDSHeartBeat) {
|
|
|
+ // 处理rds模式的mysql.ha_health_check心跳数据
|
|
|
+ // 主要RDS的心跳表基本无权限,需要mock一个tableMeta
|
|
|
+ FieldMeta idMeta = new FieldMeta("id", "bigint(20)", true, false, "0");
|
|
|
+ FieldMeta typeMeta = new FieldMeta("type", "char(1)", false, true, "0");
|
|
|
+ tableMeta = new TableMeta(table.getDbName(), table.getTableName(), Arrays.asList(idMeta, typeMeta));
|
|
|
+ } else if (isHeartBeat) {
|
|
|
+ // 处理alisql模式的test.heartbeat心跳数据
|
|
|
+ // 心跳表基本无权限,需要mock一个tableMeta
|
|
|
+ FieldMeta idMeta = new FieldMeta("id", "smallint(6)", false, true, null);
|
|
|
+ FieldMeta typeMeta = new FieldMeta("type", "int(11)", true, false, null);
|
|
|
+ tableMeta = new TableMeta(table.getDbName(), table.getTableName(), Arrays.asList(idMeta, typeMeta));
|
|
|
}
|
|
|
|
|
|
EventType eventType = null;
|
|
@@ -349,6 +413,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
table.getDbName(),
|
|
|
table.getTableName(),
|
|
|
eventType);
|
|
|
+ EntryPosition position = createPosition(event.getHeader());
|
|
|
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
|
|
|
rowChangeBuider.setTableId(event.getTableId());
|
|
|
rowChangeBuider.setIsDdl(false);
|
|
@@ -358,9 +423,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
BitSet columns = event.getColumns();
|
|
|
BitSet changeColumns = event.getChangeColumns();
|
|
|
boolean tableError = false;
|
|
|
- TableMeta tableMeta = null;
|
|
|
- if (tableMetaCache != null) {// 入错存在table meta cache
|
|
|
- tableMeta = getTableMeta(table.getDbName(), table.getTableName(), true);
|
|
|
+ if (tableMetaCache != null && tableMeta == null) {// 入错存在table meta
|
|
|
+ // cache
|
|
|
+ tableMeta = getTableMeta(table.getDbName(), table.getTableName(), true, position);
|
|
|
if (tableMeta == null) {
|
|
|
tableError = true;
|
|
|
if (!filterTableError) {
|
|
@@ -406,6 +471,13 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private EntryPosition createPosition(LogHeader logHeader) {
|
|
|
+ return new EntryPosition(binlogFileName,
|
|
|
+ logHeader.getLogPos(),
|
|
|
+ logHeader.getWhen() * 1000L,
|
|
|
+ logHeader.getServerId()); // 记录到秒
|
|
|
+ }
|
|
|
+
|
|
|
private boolean parseOneRow(RowData.Builder rowDataBuilder, RowsLogEvent event, RowsLogBuffer buffer, BitSet cols,
|
|
|
boolean isAfter, TableMeta tableMeta) throws UnsupportedEncodingException {
|
|
|
int columnCnt = event.getTable().getColumnCnt();
|
|
@@ -414,18 +486,19 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
boolean tableError = false;
|
|
|
// check table fileds count,只能处理加字段
|
|
|
boolean existRDSNoPrimaryKey = false;
|
|
|
- if (tableMeta != null && columnInfo.length > tableMeta.getFileds().size()) {
|
|
|
+ if (tableMeta != null && columnInfo.length > tableMeta.getFields().size()) {
|
|
|
if (tableMetaCache.isOnRDS()) {
|
|
|
// 特殊处理下RDS的场景
|
|
|
List<FieldMeta> primaryKeys = tableMeta.getPrimaryFields();
|
|
|
if (primaryKeys == null || primaryKeys.isEmpty()) {
|
|
|
- if (columnInfo.length == tableMeta.getFileds().size() + 1
|
|
|
+ if (columnInfo.length == tableMeta.getFields().size() + 1
|
|
|
&& columnInfo[columnInfo.length - 1].type == LogEvent.MYSQL_TYPE_LONGLONG) {
|
|
|
existRDSNoPrimaryKey = true;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ EntryPosition position = createPosition(event.getHeader());
|
|
|
if (!existRDSNoPrimaryKey) {
|
|
|
// online ddl增加字段操作步骤:
|
|
|
// 1. 新增一张临时表,将需要做ddl表的数据全量导入
|
|
@@ -433,7 +506,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
// 3. 锁住应用请求,将临时表rename为老表的名字,完成增加字段的操作
|
|
|
// 尝试做一次reload,可能因为ddl没有正确解析,或者使用了类似online ddl的操作
|
|
|
// 因为online ddl没有对应表名的alter语法,所以不会有clear cache的操作
|
|
|
- tableMeta = getTableMeta(event.getTable().getDbName(), event.getTable().getTableName(), false);// 强制重新获取一次
|
|
|
+ tableMeta = getTableMeta(event.getTable().getDbName(), event.getTable().getTableName(), false, position);// 强制重新获取一次
|
|
|
if (tableMeta == null) {
|
|
|
tableError = true;
|
|
|
if (!filterTableError) {
|
|
@@ -443,11 +516,11 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
}
|
|
|
|
|
|
// 在做一次判断
|
|
|
- if (tableMeta != null && columnInfo.length > tableMeta.getFileds().size()) {
|
|
|
+ if (tableMeta != null && columnInfo.length > tableMeta.getFields().size()) {
|
|
|
tableError = true;
|
|
|
if (!filterTableError) {
|
|
|
throw new CanalParseException("column size is not match for table:" + tableMeta.getFullName()
|
|
|
- + "," + columnInfo.length + " vs " + tableMeta.getFileds().size());
|
|
|
+ + "," + columnInfo.length + " vs " + tableMeta.getFields().size());
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
@@ -474,7 +547,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
FieldMeta fieldMeta = null;
|
|
|
if (tableMeta != null && !tableError) {
|
|
|
// 处理file meta
|
|
|
- fieldMeta = tableMeta.getFileds().get(i);
|
|
|
+ fieldMeta = tableMeta.getFields().get(i);
|
|
|
columnBuilder.setName(fieldMeta.getColumnName());
|
|
|
columnBuilder.setIsKey(fieldMeta.isKey());
|
|
|
// 增加mysql type类型,issue 73
|
|
@@ -615,6 +688,14 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
|
|
|
}
|
|
|
|
|
|
+ private Entry buildQueryEntry(String queryString, LogHeader logHeader, String tableName) {
|
|
|
+ Header header = createHeader(binlogFileName, logHeader, "", tableName, EventType.QUERY);
|
|
|
+ RowChange.Builder rowChangeBuider = RowChange.newBuilder();
|
|
|
+ rowChangeBuider.setSql(queryString);
|
|
|
+ rowChangeBuider.setEventType(EventType.QUERY);
|
|
|
+ return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
|
|
|
+ }
|
|
|
+
|
|
|
private Entry buildQueryEntry(String queryString, LogHeader logHeader) {
|
|
|
Header header = createHeader(binlogFileName, logHeader, "", "", EventType.QUERY);
|
|
|
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
|
|
@@ -673,9 +754,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- private TableMeta getTableMeta(String dbName, String tbName, boolean useCache) {
|
|
|
+ private TableMeta getTableMeta(String dbName, String tbName, boolean useCache, EntryPosition position) {
|
|
|
try {
|
|
|
- return tableMetaCache.getTableMeta(dbName, tbName, useCache);
|
|
|
+ return tableMetaCache.getTableMeta(dbName, tbName, useCache, position);
|
|
|
} catch (Exception e) {
|
|
|
String message = ExceptionUtils.getRootCauseMessage(e);
|
|
|
if (filterTableError) {
|
|
@@ -692,6 +773,14 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
return "LONGTEXT".equalsIgnoreCase(columnType) || "MEDIUMTEXT".equalsIgnoreCase(columnType)
|
|
|
|| "TEXT".equalsIgnoreCase(columnType) || "TINYTEXT".equalsIgnoreCase(columnType);
|
|
|
}
|
|
|
+
|
|
|
+ private boolean isAliSQLHeartBeat(String schema, String table) {
|
|
|
+ return "test".equalsIgnoreCase(schema) && "heartbeat".equalsIgnoreCase(table);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean isRDSHeartBeat(String schema, String table) {
|
|
|
+ return "mysql".equalsIgnoreCase(schema) && "ha_health_check".equalsIgnoreCase(table);
|
|
|
+ }
|
|
|
|
|
|
public static TransactionBegin createTransactionBegin(long threadId) {
|
|
|
TransactionBegin.Builder beginBuilder = TransactionBegin.newBuilder();
|