ソースを参照

修复最近位置被remove掉,导致重复消息的问题 (#3276)

binbin 4 年 前
コミット
f66baeabf1

+ 2 - 1
meta/src/main/java/com/alibaba/otter/canal/meta/FileMixedMetaManager.java

@@ -91,6 +91,8 @@ public class FileMixedMetaManager extends MemoryMetaManager implements CanalMeta
             for (ClientIdentity clientIdentity : tasks) {
                 MDC.put("destination", String.valueOf(clientIdentity.getDestination()));
                 try {
+                    updateCursorTasks.remove(clientIdentity);
+
                     // 定时将内存中的最新值刷到file中,多次变更只刷一次
                     if (logger.isInfoEnabled()) {
                         LogPosition cursor = (LogPosition) getCursor(clientIdentity);
@@ -100,7 +102,6 @@ public class FileMixedMetaManager extends MemoryMetaManager implements CanalMeta
                                 cursor.getIdentity().getSourceAddress().toString());
                     }
                     flushDataToFile(clientIdentity.getDestination());
-                    updateCursorTasks.remove(clientIdentity);
                 } catch (Throwable e) {
                     // ignore
                     logger.error("period update" + clientIdentity.toString() + " curosr failed!", e);

+ 2 - 1
meta/src/main/java/com/alibaba/otter/canal/meta/PeriodMixedMetaManager.java

@@ -79,9 +79,10 @@ public class PeriodMixedMetaManager extends MemoryMetaManager implements CanalMe
             List<ClientIdentity> tasks = new ArrayList<>(updateCursorTasks);
             for (ClientIdentity clientIdentity : tasks) {
                 try {
+                    updateCursorTasks.remove(clientIdentity);
+
                     // 定时将内存中的最新值刷到zookeeper中,多次变更只刷一次
                     zooKeeperMetaManager.updateCursor(clientIdentity, getCursor(clientIdentity));
-                    updateCursorTasks.remove(clientIdentity);
                 } catch (Throwable e) {
                     // ignore
                     logger.error("period update" + clientIdentity.toString() + " curosr failed!", e);