|
@@ -63,6 +63,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
private CanalEventFilter filter;
|
|
|
private CanalEventFilter blackFilter;
|
|
|
private EntryPosition lastPosition;
|
|
|
+ private boolean hasNewDdl;
|
|
|
private MetaHistoryDAO metaHistoryDAO;
|
|
|
private MetaSnapshotDAO metaSnapshotDAO;
|
|
|
private int snapshotInterval = 24;
|
|
@@ -139,6 +140,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
synchronized (memoryTableMeta) {
|
|
|
if (memoryTableMeta.apply(position, schema, ddl, extra)) {
|
|
|
this.lastPosition = position;
|
|
|
+ this.hasNewDdl = true;
|
|
|
// 同步每次变更给远程做历史记录
|
|
|
return applyHistoryToDB(position, schema, ddl, extra);
|
|
|
} else {
|
|
@@ -270,10 +272,11 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
MemoryTableMeta tmpMemoryTableMeta = new MemoryTableMeta();
|
|
|
Map<String, String> schemaDdls = null;
|
|
|
synchronized (memoryTableMeta) {
|
|
|
- if (!init && position == null) {
|
|
|
+ if (!init && !hasNewDdl) {
|
|
|
// 如果是持续构建,则识别一下是否有DDL变更过,如果没有就忽略了
|
|
|
return false;
|
|
|
}
|
|
|
+ this.hasNewDdl = false;
|
|
|
schemaDdls = memoryTableMeta.snapshot();
|
|
|
for (Map.Entry<String, String> entry : schemaDdls.entrySet()) {
|
|
|
tmpMemoryTableMeta.apply(position, entry.getKey(), entry.getValue(), null);
|