|
@@ -10,6 +10,8 @@ import java.util.concurrent.ScheduledExecutorService;
|
|
|
import java.util.concurrent.ScheduledFuture;
|
|
|
import java.util.concurrent.ThreadFactory;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.locks.ReadWriteLock;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
import org.apache.commons.beanutils.BeanUtils;
|
|
@@ -57,6 +59,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
return thread;
|
|
|
}
|
|
|
});
|
|
|
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
private String destination;
|
|
|
private MemoryTableMeta memoryTableMeta;
|
|
|
private MysqlConnection connection; // 查询meta信息的链接
|
|
@@ -129,15 +132,19 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
|
|
|
@Override
|
|
|
public TableMeta find(String schema, String table) {
|
|
|
- synchronized (memoryTableMeta) {
|
|
|
+ lock.readLock().lock();
|
|
|
+ try{
|
|
|
return memoryTableMeta.find(schema, table);
|
|
|
+ }finally {
|
|
|
+ lock.readLock().unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public boolean apply(EntryPosition position, String schema, String ddl, String extra) {
|
|
|
// 首先记录到内存结构
|
|
|
- synchronized (memoryTableMeta) {
|
|
|
+ lock.writeLock().lock();
|
|
|
+ try{
|
|
|
if (memoryTableMeta.apply(position, schema, ddl, extra)) {
|
|
|
this.lastPosition = position;
|
|
|
this.hasNewDdl = true;
|
|
@@ -146,6 +153,8 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
} else {
|
|
|
throw new RuntimeException("apply to memory is failed");
|
|
|
}
|
|
|
+ }finally {
|
|
|
+ lock.writeLock().unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -269,18 +278,22 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
*/
|
|
|
private boolean applySnapshotToDB(EntryPosition position, boolean init) {
|
|
|
// 获取一份快照
|
|
|
- MemoryTableMeta tmpMemoryTableMeta = new MemoryTableMeta();
|
|
|
Map<String, String> schemaDdls = null;
|
|
|
- synchronized (memoryTableMeta) {
|
|
|
+ lock.readLock().lock();
|
|
|
+ try{
|
|
|
if (!init && !hasNewDdl) {
|
|
|
// 如果是持续构建,则识别一下是否有DDL变更过,如果没有就忽略了
|
|
|
return false;
|
|
|
}
|
|
|
this.hasNewDdl = false;
|
|
|
schemaDdls = memoryTableMeta.snapshot();
|
|
|
- for (Map.Entry<String, String> entry : schemaDdls.entrySet()) {
|
|
|
- tmpMemoryTableMeta.apply(position, entry.getKey(), entry.getValue(), null);
|
|
|
- }
|
|
|
+ }finally{
|
|
|
+ lock.readLock().unlock();
|
|
|
+ }
|
|
|
+
|
|
|
+ MemoryTableMeta tmpMemoryTableMeta = new MemoryTableMeta();
|
|
|
+ for (Map.Entry<String, String> entry : schemaDdls.entrySet()) {
|
|
|
+ tmpMemoryTableMeta.apply(position, entry.getKey(), entry.getValue(), null);
|
|
|
}
|
|
|
|
|
|
// 基于临时内存对象进行对比
|