|
@@ -233,7 +233,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
if (useDruidDdlFilter) {
|
|
|
List<DdlResult> results = DruidDdlParser.parse(queryString, event.getDbName());
|
|
|
for (DdlResult result : results) {
|
|
|
- if (!processFilter(queryString, result)) {
|
|
|
+ if (!processFilter(queryString, result, event, isSeek)) {
|
|
|
// 只要有一个数据不进行过滤
|
|
|
notFilter = true;
|
|
|
}
|
|
@@ -246,7 +246,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
}
|
|
|
} else {
|
|
|
DdlResult result = SimpleDdlParser.parse(queryString, event.getDbName());
|
|
|
- if (!processFilter(queryString, result)) {
|
|
|
+ if (!processFilter(queryString, result, event, isSeek)) {
|
|
|
notFilter = true;
|
|
|
}
|
|
|
|
|
@@ -260,12 +260,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- if (!isSeek) {
|
|
|
- // 使用新的表结构元数据管理方式
|
|
|
- EntryPosition position = createPosition(event.getHeader());
|
|
|
- tableMetaCache.apply(position, event.getDbName(), queryString, null);
|
|
|
- }
|
|
|
-
|
|
|
Header header = createHeader(event.getHeader(), schemaName, tableName, type);
|
|
|
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
|
|
|
if (type != EventType.QUERY && type != EventType.INSERT && type != EventType.UPDATE
|
|
@@ -285,7 +279,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
return StringUtils.substringAfter(queryString, type);
|
|
|
}
|
|
|
|
|
|
- private boolean processFilter(String queryString, DdlResult result) {
|
|
|
+ private boolean processFilter(String queryString, DdlResult result, QueryLogEvent event, boolean isSeek) {
|
|
|
String schemaName = result.getSchemaName();
|
|
|
String tableName = result.getTableName();
|
|
|
// fixed issue https://github.com/alibaba/canal/issues/58
|
|
@@ -312,10 +306,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
|| result.getType() == EventType.RENAME || result.getType() == EventType.CINDEX
|
|
|
|| result.getType() == EventType.DINDEX) { // 针对DDL类型
|
|
|
|
|
|
- if (filterQueryDdl) {
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
if (StringUtils.isEmpty(tableName)
|
|
|
|| (result.getType() == EventType.RENAME && StringUtils.isEmpty(result.getOriTableName()))) {
|
|
|
// 如果解析不出tableName,记录一下日志,方便bugfix,目前直接抛出异常,中断解析
|
|
@@ -351,6 +341,16 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if (!isSeek) {
|
|
|
+ // 使用新的表结构元数据管理方式
|
|
|
+ EntryPosition position = createPosition(event.getHeader());
|
|
|
+ tableMetaCache.apply(position, event.getDbName(), queryString, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (filterQueryDdl) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
} else if (result.getType() == EventType.INSERT || result.getType() == EventType.UPDATE
|
|
|
|| result.getType() == EventType.DELETE) {
|
|
|
// 对外返回,保证兼容,还是返回QUERY类型,这里暂不解析tableName,所以无法支持过滤
|