|
@@ -47,32 +47,34 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
|
|
*/
|
|
*/
|
|
public class DatabaseTableMeta implements TableMetaTSDB {
|
|
public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
|
|
|
- public static final EntryPosition INIT_POSITION = new EntryPosition("0", 0L, -2L, -1L);
|
|
|
|
- private static Logger logger = LoggerFactory.getLogger(DatabaseTableMeta.class);
|
|
|
|
- private static Pattern pattern = Pattern.compile("Duplicate entry '.*' for key '*'");
|
|
|
|
- private static Pattern h2Pattern = Pattern.compile("Unique index or primary key violation");
|
|
|
|
- private static ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
|
|
|
|
- @Override
|
|
|
|
- public Thread newThread(Runnable r) {
|
|
|
|
- Thread thread = new Thread(r, "[scheduler-table-meta-snapshot]");
|
|
|
|
- thread.setDaemon(true);
|
|
|
|
- return thread;
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- private ReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
|
- private String destination;
|
|
|
|
- private MemoryTableMeta memoryTableMeta;
|
|
|
|
- private MysqlConnection connection; // 查询meta信息的链接
|
|
|
|
- private CanalEventFilter filter;
|
|
|
|
- private CanalEventFilter blackFilter;
|
|
|
|
- private EntryPosition lastPosition;
|
|
|
|
- private boolean hasNewDdl;
|
|
|
|
- private MetaHistoryDAO metaHistoryDAO;
|
|
|
|
- private MetaSnapshotDAO metaSnapshotDAO;
|
|
|
|
- private int snapshotInterval = 24;
|
|
|
|
- private int snapshotExpire = 360;
|
|
|
|
- private ScheduledFuture<?> scheduleSnapshotFuture;
|
|
|
|
-
|
|
|
|
|
|
+ public static final EntryPosition INIT_POSITION = new EntryPosition("0", 0L, -2L, -1L);
|
|
|
|
+ private static Logger logger = LoggerFactory.getLogger(DatabaseTableMeta.class);
|
|
|
|
+ private static Pattern pattern = Pattern.compile("Duplicate entry '.*' for key '*'");
|
|
|
|
+ private static Pattern h2Pattern = Pattern.compile("Unique index or primary key violation");
|
|
|
|
+ private static ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Thread newThread(Runnable r) {
|
|
|
|
+ Thread thread = new Thread(r,
|
|
|
|
+ "[scheduler-table-meta-snapshot]");
|
|
|
|
+ thread.setDaemon(true);
|
|
|
|
+ return thread;
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
|
+ private String destination;
|
|
|
|
+ private MemoryTableMeta memoryTableMeta;
|
|
|
|
+ private MysqlConnection connection; // 查询meta信息的链接
|
|
|
|
+ private CanalEventFilter filter;
|
|
|
|
+ private CanalEventFilter blackFilter;
|
|
|
|
+ private EntryPosition lastPosition;
|
|
|
|
+ private boolean hasNewDdl;
|
|
|
|
+ private MetaHistoryDAO metaHistoryDAO;
|
|
|
|
+ private MetaSnapshotDAO metaSnapshotDAO;
|
|
|
|
+ private int snapshotInterval = 24;
|
|
|
|
+ private int snapshotExpire = 360;
|
|
|
|
+ private ScheduledFuture<?> scheduleSnapshotFuture;
|
|
|
|
+
|
|
public DatabaseTableMeta(){
|
|
public DatabaseTableMeta(){
|
|
|
|
|
|
}
|
|
}
|
|
@@ -109,13 +111,13 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
}
|
|
}
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public void destory() {
|
|
public void destory() {
|
|
if (memoryTableMeta != null) {
|
|
if (memoryTableMeta != null) {
|
|
memoryTableMeta.destory();
|
|
memoryTableMeta.destory();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
if (connection != null) {
|
|
if (connection != null) {
|
|
try {
|
|
try {
|
|
connection.disconnect();
|
|
connection.disconnect();
|
|
@@ -124,7 +126,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
.getAddress(), e);
|
|
.getAddress(), e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
if (scheduleSnapshotFuture != null) {
|
|
if (scheduleSnapshotFuture != null) {
|
|
scheduleSnapshotFuture.cancel(false);
|
|
scheduleSnapshotFuture.cancel(false);
|
|
}
|
|
}
|
|
@@ -133,9 +135,9 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
@Override
|
|
@Override
|
|
public TableMeta find(String schema, String table) {
|
|
public TableMeta find(String schema, String table) {
|
|
lock.readLock().lock();
|
|
lock.readLock().lock();
|
|
- try{
|
|
|
|
|
|
+ try {
|
|
return memoryTableMeta.find(schema, table);
|
|
return memoryTableMeta.find(schema, table);
|
|
- }finally {
|
|
|
|
|
|
+ } finally {
|
|
lock.readLock().unlock();
|
|
lock.readLock().unlock();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -144,7 +146,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
public boolean apply(EntryPosition position, String schema, String ddl, String extra) {
|
|
public boolean apply(EntryPosition position, String schema, String ddl, String extra) {
|
|
// 首先记录到内存结构
|
|
// 首先记录到内存结构
|
|
lock.writeLock().lock();
|
|
lock.writeLock().lock();
|
|
- try{
|
|
|
|
|
|
+ try {
|
|
if (memoryTableMeta.apply(position, schema, ddl, extra)) {
|
|
if (memoryTableMeta.apply(position, schema, ddl, extra)) {
|
|
this.lastPosition = position;
|
|
this.lastPosition = position;
|
|
this.hasNewDdl = true;
|
|
this.hasNewDdl = true;
|
|
@@ -153,7 +155,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
} else {
|
|
} else {
|
|
throw new RuntimeException("apply to memory is failed");
|
|
throw new RuntimeException("apply to memory is failed");
|
|
}
|
|
}
|
|
- }finally {
|
|
|
|
|
|
+ } finally {
|
|
lock.writeLock().unlock();
|
|
lock.writeLock().unlock();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -280,14 +282,14 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
// 获取一份快照
|
|
// 获取一份快照
|
|
Map<String, String> schemaDdls = null;
|
|
Map<String, String> schemaDdls = null;
|
|
lock.readLock().lock();
|
|
lock.readLock().lock();
|
|
- try{
|
|
|
|
|
|
+ try {
|
|
if (!init && !hasNewDdl) {
|
|
if (!init && !hasNewDdl) {
|
|
// 如果是持续构建,则识别一下是否有DDL变更过,如果没有就忽略了
|
|
// 如果是持续构建,则识别一下是否有DDL变更过,如果没有就忽略了
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
this.hasNewDdl = false;
|
|
this.hasNewDdl = false;
|
|
schemaDdls = memoryTableMeta.snapshot();
|
|
schemaDdls = memoryTableMeta.snapshot();
|
|
- }finally{
|
|
|
|
|
|
+ } finally {
|
|
lock.readLock().unlock();
|
|
lock.readLock().unlock();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -488,7 +490,24 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
- if (!StringUtils.equalsIgnoreCase(sourceField.getColumnType(), targetField.getColumnType())) {
|
|
|
|
|
|
+ // if (!StringUtils.equalsIgnoreCase(sourceField.getColumnType(),
|
|
|
|
+ // targetField.getColumnType())) {
|
|
|
|
+ // return false;
|
|
|
|
+ // }
|
|
|
|
+
|
|
|
|
+ // https://github.com/alibaba/canal/issues/1100
|
|
|
|
+ // 支持一下 int vs int(10)
|
|
|
|
+ if ((sourceField.isUnsigned() && !targetField.isUnsigned())
|
|
|
|
+ || (!sourceField.isUnsigned() && targetField.isUnsigned())) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ boolean columnTypeCompare = false;
|
|
|
|
+ columnTypeCompare |= StringUtils.containsIgnoreCase(sourceField.getColumnType(),
|
|
|
|
+ targetField.getColumnType());
|
|
|
|
+ columnTypeCompare |= StringUtils.containsIgnoreCase(targetField.getColumnType(),
|
|
|
|
+ sourceField.getColumnType());
|
|
|
|
+ if (!columnTypeCompare) {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|