|
@@ -1,27 +1,5 @@
|
|
|
package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
-import java.util.concurrent.ScheduledExecutorService;
|
|
|
-import java.util.concurrent.ScheduledFuture;
|
|
|
-import java.util.concurrent.ThreadFactory;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
-import java.util.concurrent.locks.ReadWriteLock;
|
|
|
-import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
-import java.util.regex.Pattern;
|
|
|
-
|
|
|
-import org.apache.commons.beanutils.BeanUtils;
|
|
|
-import org.apache.commons.lang.ObjectUtils;
|
|
|
-import org.apache.commons.lang.StringUtils;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-import org.slf4j.MDC;
|
|
|
-
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.alibaba.fastsql.sql.repository.Schema;
|
|
@@ -39,6 +17,23 @@ import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDO;
|
|
|
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO;
|
|
|
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDO;
|
|
|
import com.alibaba.otter.canal.protocol.position.EntryPosition;
|
|
|
+import org.apache.commons.beanutils.BeanUtils;
|
|
|
+import org.apache.commons.lang.ObjectUtils;
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.slf4j.MDC;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.locks.ReadWriteLock;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
+import java.util.regex.Pattern;
|
|
|
|
|
|
/**
|
|
|
* 基于db远程管理 see internal class: CanalTableMeta , ConsoleTableMetaTSDB
|
|
@@ -456,7 +451,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
// 如果是同一秒内,对比一下history的位点,如果比期望的位点要大,忽略之
|
|
|
if (snapshotPosition.getTimestamp() > rollbackPosition.getTimestamp()) {
|
|
|
continue;
|
|
|
- } else if (rollbackPosition.getServerId() == snapshotPosition.getServerId()
|
|
|
+ } else if (rollbackPosition.getServerId().equals(snapshotPosition.getServerId())
|
|
|
&& snapshotPosition.compareTo(rollbackPosition) > 0) {
|
|
|
continue;
|
|
|
}
|