소스 검색

Merge pull request #291 from MarkLinHz/master

重构LogPositionManager并适配XML配置
agapple 8 년 전
부모
커밋
a6604978ed
25개의 변경된 파일404개의 추가작업 그리고 410개의 파일을 삭제
  1. 5 5
      deployer/src/main/resources/spring/default-instance.xml
  2. 5 5
      deployer/src/main/resources/spring/file-instance.xml
  3. 5 5
      deployer/src/main/resources/spring/local-instance.xml
  4. 1 1
      instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java
  5. 10 22
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java
  6. 5 5
      instance/spring/src/test/resources/spring/default-instance.xml
  7. 8 6
      instance/spring/src/test/resources/spring/file-instance.xml
  8. 2 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  9. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java
  10. 10 0
      parse/src/main/java/com/alibaba/otter/canal/parse/index/AbstractLogPositionManager.java
  11. 3 4
      parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java
  12. 37 30
      parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java
  13. 82 82
      parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java
  14. 17 12
      parse/src/main/java/com/alibaba/otter/canal/parse/index/MemoryLogPositionManager.java
  15. 34 29
      parse/src/main/java/com/alibaba/otter/canal/parse/index/MetaLogPositionManager.java
  16. 52 59
      parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java
  17. 66 66
      parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java
  18. 14 23
      parse/src/main/java/com/alibaba/otter/canal/parse/index/ZooKeeperLogPositionManager.java
  19. 8 7
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/group/GroupEventPaserTest.java
  20. 12 11
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogDumpTest.java
  21. 4 4
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParserTest.java
  22. 8 7
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java
  23. 11 10
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParserTest.java
  24. 4 6
      parse/src/test/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManagerTest.java
  25. 0 8
      parse/src/test/java/com/alibaba/otter/canal/parse/stub/AbstractCanalLogPositionManager.java

+ 5 - 5
deployer/src/main/resources/spring/default-instance.xml

@@ -128,14 +128,14 @@
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
-				<property name="primary">
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
-				</property>
-				<property name="failback">
+				</constructor-arg>
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
-						<property name="metaManager" ref="metaManager" />
+						<constructor-arg ref="metaManager"/>
 					</bean>
-				</property>
+				</constructor-arg>
 			</bean>
 		</property>
 		

+ 5 - 5
deployer/src/main/resources/spring/file-instance.xml

@@ -113,14 +113,14 @@
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
-				<property name="primary">
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
-				</property>
-				<property name="failback">
+				</constructor-arg>
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
-						<property name="metaManager" ref="metaManager" />
+						<constructor-arg ref="metaManager"/>
 					</bean>
-				</property>
+				</constructor-arg>
 			</bean>
 		</property>
 		

+ 5 - 5
deployer/src/main/resources/spring/local-instance.xml

@@ -123,14 +123,14 @@
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
-				<property name="primary">
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
-				</property>
-				<property name="failback">
+				</constructor-arg>
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
-						<property name="metaManager" ref="metaManager" />
+						<constructor-arg ref="metaManager"/>
 					</bean>
-				</property>
+				</constructor-arg>
 			</bean>
 		</property>
 		

+ 1 - 1
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.instance.core;
 
 import java.util.List;
 
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -16,7 +17,6 @@ import com.alibaba.otter.canal.parse.ha.HeartBeatHAController;
 import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
-import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.sink.CanalEventSink;

+ 10 - 22
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -6,6 +6,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import com.alibaba.otter.canal.parse.index.*;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,12 +38,6 @@ import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
-import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
-import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
-import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
-import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
-import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
-import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;
@@ -346,29 +341,22 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
     protected CanalLogPositionManager initLogPositionManager() {
         logger.info("init logPositionPersistManager begin...");
         IndexMode indexMode = parameters.getIndexMode();
-        CanalLogPositionManager logPositionManager = null;
+        CanalLogPositionManager logPositionManager;
         if (indexMode.isMemory()) {
             logPositionManager = new MemoryLogPositionManager();
         } else if (indexMode.isZookeeper()) {
-            logPositionManager = new ZooKeeperLogPositionManager();
-            ((ZooKeeperLogPositionManager) logPositionManager).setZkClientx(getZkclientx());
+            logPositionManager = new ZooKeeperLogPositionManager(getZkclientx());
         } else if (indexMode.isMixed()) {
-            logPositionManager = new PeriodMixedLogPositionManager();
-
-            ZooKeeperLogPositionManager zooKeeperLogPositionManager = new ZooKeeperLogPositionManager();
-            zooKeeperLogPositionManager.setZkClientx(getZkclientx());
-            ((PeriodMixedLogPositionManager) logPositionManager).setZooKeeperLogPositionManager(zooKeeperLogPositionManager);
+            MemoryLogPositionManager memoryLogPositionManager = new MemoryLogPositionManager();
+            ZooKeeperLogPositionManager zooKeeperLogPositionManager = new ZooKeeperLogPositionManager(getZkclientx());
+            logPositionManager = new PeriodMixedLogPositionManager(memoryLogPositionManager, zooKeeperLogPositionManager, 1000L);
         } else if (indexMode.isMeta()) {
-            logPositionManager = new MetaLogPositionManager();
-            ((MetaLogPositionManager) logPositionManager).setMetaManager(metaManager);
+            logPositionManager = new MetaLogPositionManager(metaManager);
         } else if (indexMode.isMemoryMetaFailback()) {
-            MemoryLogPositionManager primaryLogPositionManager = new MemoryLogPositionManager();
-            MetaLogPositionManager failbackLogPositionManager = new MetaLogPositionManager();
-            failbackLogPositionManager.setMetaManager(metaManager);
+            MemoryLogPositionManager primary = new MemoryLogPositionManager();
+            MetaLogPositionManager secondary = new MetaLogPositionManager(metaManager);
 
-            logPositionManager = new FailbackLogPositionManager();
-            ((FailbackLogPositionManager) logPositionManager).setPrimary(primaryLogPositionManager);
-            ((FailbackLogPositionManager) logPositionManager).setFailback(failbackLogPositionManager);
+            logPositionManager = new FailbackLogPositionManager(primary, secondary);
         } else {
             throw new CanalException("unsupport indexMode for " + indexMode);
         }

+ 5 - 5
instance/spring/src/test/resources/spring/default-instance.xml

@@ -120,14 +120,14 @@
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
-				<property name="primary">
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
-				</property>
-				<property name="failback">
+				</constructor-arg>
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
-						<property name="metaManager" ref="metaManager" />
+						<constructor-arg ref="metaManager"/>
 					</bean>
-				</property>
+				</constructor-arg>
 			</bean>
 		</property>
 		

+ 8 - 6
instance/spring/src/test/resources/spring/file-instance.xml

@@ -102,18 +102,20 @@
 		<!-- 解析编码 -->
 		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
-	
+
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
-				<property name="primary">
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
-				</property>
-				<property name="failback">
+				</constructor-arg>
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
-						<property name="metaManager" ref="metaManager" />
+						<constructor-arg>
+							<ref bean="metaManager"/>
+						</constructor-arg>
 					</bean>
-				</property>
+				</constructor-arg>
 			</bean>
 		</property>
 		

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -8,6 +8,7 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.math.RandomUtils;
@@ -22,7 +23,6 @@ import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback;
-import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
@@ -44,7 +44,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
 
     protected final Logger                           logger                     = LoggerFactory.getLogger(this.getClass());
 
-    protected CanalLogPositionManager                logPositionManager         = null;
+    protected CanalLogPositionManager logPositionManager         = null;
     protected CanalEventSink<List<CanalEntry.Entry>> eventSink                  = null;
     protected CanalEventFilter                       eventFilter                = null;
     protected CanalEventFilter                       eventBlackFilter           = null;

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 
 import java.io.IOException;
 
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.parse.CanalEventParser;
@@ -9,7 +10,6 @@ import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
-import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.protocol.position.LogPosition;

+ 10 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/index/AbstractLogPositionManager.java

@@ -0,0 +1,10 @@
+package com.alibaba.otter.canal.parse.index;
+
+import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
+
+/**
+ * Created by yinxiu on 17/3/17.
+ * Email: marklin.hz@gmail.com
+ */
+public abstract class AbstractLogPositionManager extends AbstractCanalLifeCycle implements CanalLogPositionManager {
+}

+ 3 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java

@@ -5,14 +5,13 @@ import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 
 /**
- * 接口组合
- * 
- * @author jianghang 2012-7-7 上午10:02:02
- * @version 1.0.0
+ * Created by yinxiu on 17/3/17.
+ * Email: marklin.hz@gmail.com
  */
 public interface CanalLogPositionManager extends CanalLifeCycle {
 
     LogPosition getLatestIndexBy(String destination);
 
     void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException;
+
 }

+ 37 - 30
parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java

@@ -1,74 +1,81 @@
 package com.alibaba.otter.canal.parse.index;
 
-import org.springframework.util.Assert;
-
-import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
+ * Created by yinxiu on 17/3/18.
+ * Email: marklin.hz@gmail.com
+ *
  * 实现基于failover查找的机制完成meta的操作
- * 
+ *
  * <pre>
  * 应用场景:比如针对内存buffer,出现HA切换,先尝试从内存buffer区中找到lastest position,如果不存在才尝试找一下meta里消费的信息
  * </pre>
- * 
- * @author jianghang 2012-7-20 下午02:33:20
  */
-public class FailbackLogPositionManager extends AbstractCanalLifeCycle implements CanalLogPositionManager {
+public class FailbackLogPositionManager extends AbstractLogPositionManager {
+
+    private final static Logger logger = LoggerFactory.getLogger(FailbackLogPositionManager.class);
+
+    private final CanalLogPositionManager primary;
+    private final CanalLogPositionManager secondary;
+
+    public FailbackLogPositionManager(CanalLogPositionManager primary, CanalLogPositionManager secondary) {
+        if (primary == null) {
+            throw new NullPointerException("nul primary LogPositionManager");
+        }
+        if (secondary == null) {
+            throw new NullPointerException("nul secondary LogPositionManager");
+        }
 
-    private CanalLogPositionManager primary;
-    private CanalLogPositionManager failback;
+        this.primary = primary;
+        this.secondary = secondary;
+    }
 
+    @Override
     public void start() {
         super.start();
-        Assert.notNull(primary);
-        Assert.notNull(failback);
 
         if (!primary.isStart()) {
             primary.start();
         }
 
-        if (!failback.isStart()) {
-            failback.start();
+        if (!secondary.isStart()) {
+            primary.start();
         }
     }
 
+    @Override
     public void stop() {
         super.stop();
 
-        if (primary.isStart()) {
-            primary.stop();
+        if (secondary.isStart()) {
+            secondary.stop();
         }
 
-        if (failback.isStart()) {
-            failback.stop();
+        if (primary.isStart()) {
+            primary.stop();
         }
     }
 
+    @Override
     public LogPosition getLatestIndexBy(String destination) {
         LogPosition logPosition = primary.getLatestIndexBy(destination);
-        if (logPosition == null) {
-            return failback.getLatestIndexBy(destination);
-        } else {
+        if (logPosition != null) {
             return logPosition;
         }
+        return secondary.getLatestIndexBy(destination);
     }
 
+    @Override
     public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
         try {
             primary.persistLogPosition(destination, logPosition);
         } catch (CanalParseException e) {
-            failback.persistLogPosition(destination, logPosition);
+            logger.warn("persistLogPosition use primary log position manager exception. destination: {}, logPosition: {}", destination, logPosition, e);
+            secondary.persistLogPosition(destination, logPosition);
         }
     }
-
-    public void setPrimary(CanalLogPositionManager primary) {
-        this.primary = primary;
-    }
-
-    public void setFailback(CanalLogPositionManager failback) {
-        this.failback = failback;
-    }
-
 }

+ 82 - 82
parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java

@@ -1,60 +1,82 @@
 package com.alibaba.otter.canal.parse.index;
 
+import com.alibaba.otter.canal.common.utils.JsonUtils;
+import com.alibaba.otter.canal.meta.exception.CanalMetaManagerException;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.protocol.position.LogPosition;
+import com.google.common.base.Function;
+import com.google.common.collect.MigrateMap;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-
-import com.alibaba.otter.canal.common.utils.JsonUtils;
-import com.alibaba.otter.canal.meta.exception.CanalMetaManagerException;
-import com.alibaba.otter.canal.protocol.position.LogPosition;
-import com.google.common.base.Function;
-import com.google.common.collect.MigrateMap;
-
 /**
+ * Created by yinxiu on 17/3/18.
+ * Email: marklin.hz@gmail.com
+ *
  * 基于文件刷新的log position实现
- * 
+ *
  * <pre>
  * 策略:
  * 1. 先写内存,然后定时刷新数据到File
  * 2. 数据采取overwrite模式(只保留最后一次)
  * </pre>
- * 
- * @author jianghang 2013-4-15 下午09:40:48
- * @version 1.0.4
  */
-public class FileMixedLogPositionManager extends MemoryLogPositionManager {
+public class FileMixedLogPositionManager extends AbstractLogPositionManager {
+
+    private final static Logger logger = LoggerFactory.getLogger(FileMixedLogPositionManager.class);
+    private final static Charset charset = Charset.forName("UTF-8");
+
+    private File dataDir;
+
+    private Map<String, File> dataFileCaches;
+
+    private ScheduledExecutorService executorService;
+
+    private final LogPosition nullPosition = new LogPosition(){};
+
+    private MemoryLogPositionManager memoryLogPositionManager;
+
+    private long period;
+    private Set<String> persistTasks;
+
+    public FileMixedLogPositionManager(File dataDir, long period, MemoryLogPositionManager memoryLogPositionManager) {
+        if (dataDir == null) {
+            throw new NullPointerException("null dataDir");
+        }
+        if (period <= 0) {
+            throw new IllegalArgumentException("period must be positive, given: " + period);
+        }
+        if (memoryLogPositionManager == null) {
+            throw new NullPointerException("null memoryLogPositionManager");
+        }
+        this.dataDir = dataDir;
+        this.period = period;
+        this.memoryLogPositionManager = memoryLogPositionManager;
 
-    private static final Logger      logger       = LoggerFactory.getLogger(FileMixedLogPositionManager.class);
-    private static final Charset     charset      = Charset.forName("UTF-8");
-    private File                     dataDir;
-    private String                   dataFileName = "parse.dat";
-    private Map<String, File>        dataFileCaches;
-    private ScheduledExecutorService executor;
-    @SuppressWarnings("serial")
-    private final LogPosition        nullPosition = new LogPosition() {
-                                                  };
+        this.dataFileCaches = MigrateMap.makeComputingMap(new Function<String, File>() {
 
-    private long                     period       = 1000;                                                      // 单位ms
-    private Set<String>              persistTasks;
+            public File apply(String destination) {
+                return getDataFile(destination);
+            }
+        });
 
+        this.executorService = Executors.newScheduledThreadPool(1);
+        this.persistTasks = Collections.synchronizedSet(new HashSet<String>());
+    }
+
+    @Override
     public void start() {
         super.start();
 
-        Assert.notNull(dataDir);
         if (!dataDir.exists()) {
             try {
                 FileUtils.forceMkdir(dataDir);
@@ -67,30 +89,12 @@ public class FileMixedLogPositionManager extends MemoryLogPositionManager {
             throw new CanalMetaManagerException("dir[" + dataDir.getPath() + "] can not read/write");
         }
 
-        dataFileCaches = MigrateMap.makeComputingMap(new Function<String, File>() {
-
-            public File apply(String destination) {
-                return getDataFile(destination);
-            }
-        });
-
-        executor = Executors.newScheduledThreadPool(1);
-        positions = MigrateMap.makeComputingMap(new Function<String, LogPosition>() {
-
-            public LogPosition apply(String destination) {
-                LogPosition logPosition = loadDataFromFile(dataFileCaches.get(destination));
-                if (logPosition == null) {
-                    return nullPosition;
-                } else {
-                    return logPosition;
-                }
-            }
-        });
-
-        persistTasks = Collections.synchronizedSet(new HashSet<String>());
+        if (!memoryLogPositionManager.isStart()) {
+            memoryLogPositionManager.start();
+        }
 
         // 启动定时工作任务
-        executor.scheduleAtFixedRate(new Runnable() {
+        executorService.scheduleAtFixedRate(new Runnable() {
 
             public void run() {
                 List<String> tasks = new ArrayList<String>(persistTasks);
@@ -106,28 +110,35 @@ public class FileMixedLogPositionManager extends MemoryLogPositionManager {
                 }
             }
         }, period, period, TimeUnit.MILLISECONDS);
+
     }
 
+    @Override
     public void stop() {
         super.stop();
 
         flushDataToFile();
-        executor.shutdownNow();
-        positions.clear();
-    }
-
-    public void persistLogPosition(String destination, LogPosition logPosition) {
-        persistTasks.add(destination);// 添加到任务队列中进行触发
-        super.persistLogPosition(destination, logPosition);
+        executorService.shutdown();
+        memoryLogPositionManager.stop();
     }
 
+    @Override
     public LogPosition getLatestIndexBy(String destination) {
-        LogPosition logPostion = super.getLatestIndexBy(destination);
-        if (logPostion == nullPosition) {
-            return null;
-        } else {
-            return logPostion;
+        LogPosition logPosition = memoryLogPositionManager.getLatestIndexBy(destination);
+        if (logPosition != null) {
+            return logPosition;
+        }
+        logPosition = loadDataFromFile(dataFileCaches.get(destination));
+        if (logPosition == null) {
+            return nullPosition;
         }
+        return logPosition;
+    }
+
+    @Override
+    public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
+        persistTasks.add(destination);
+        memoryLogPositionManager.persistLogPosition(destination, logPosition);
     }
 
     // ============================ helper method ======================
@@ -142,11 +153,12 @@ public class FileMixedLogPositionManager extends MemoryLogPositionManager {
             }
         }
 
+        String dataFileName = "parse.dat";
         return new File(destinationMetaDir, dataFileName);
     }
 
     private void flushDataToFile() {
-        for (String destination : positions.keySet()) {
+        for (String destination : memoryLogPositionManager.destinations()) {
             flushDataToFile(destination);
         }
     }
@@ -156,7 +168,7 @@ public class FileMixedLogPositionManager extends MemoryLogPositionManager {
     }
 
     private void flushDataToFile(String destination, File dataFile) {
-        LogPosition position = positions.get(destination);
+        LogPosition position = memoryLogPositionManager.getLatestIndexBy(destination);
         if (position != null && position != nullPosition) {
             String json = JsonUtils.marshalToString(position);
             try {
@@ -179,16 +191,4 @@ public class FileMixedLogPositionManager extends MemoryLogPositionManager {
             throw new CanalMetaManagerException(e);
         }
     }
-
-    public void setDataDir(String dataDir) {
-        this.dataDir = new File(dataDir);
-    }
-
-    public void setDataDir(File dataDir) {
-        this.dataDir = dataDir;
-    }
-
-    public void setPeriod(long period) {
-        this.period = period;
-    }
 }

+ 17 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/index/MemoryLogPositionManager.java

@@ -1,39 +1,44 @@
 package com.alibaba.otter.canal.parse.index;
 
-import java.util.Map;
-
-import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.google.common.collect.MapMaker;
 
+import java.util.Map;
+import java.util.Set;
+
 /**
- * 基于内存的实现
- * 
- * @author jianghang 2012-7-7 上午10:17:23
- * @version 1.0.0
+ * Created by yinxiu on 17/3/17.
+ * Email: marklin.hz@gmail.com
  */
-public class MemoryLogPositionManager extends AbstractCanalLifeCycle implements CanalLogPositionManager {
+public class MemoryLogPositionManager extends AbstractLogPositionManager {
 
-    protected Map<String, LogPosition> positions;
+    private Map<String, LogPosition> positions;
 
+    @Override
     public void start() {
         super.start();
-
         positions = new MapMaker().makeMap();
     }
 
+    @Override
     public void stop() {
         super.stop();
-
         positions.clear();
     }
 
+    @Override
     public LogPosition getLatestIndexBy(String destination) {
         return positions.get(destination);
     }
 
-    public void persistLogPosition(String destination, LogPosition logPosition) {
+    @Override
+    public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
         positions.put(destination, logPosition);
     }
 
+    public Set<String> destinations() {
+        return positions.keySet();
+    }
+
 }

+ 34 - 29
parse/src/main/java/com/alibaba/otter/canal/parse/index/MetaLogPositionManager.java

@@ -1,55 +1,59 @@
 package com.alibaba.otter.canal.parse.index;
 
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-import org.springframework.util.CollectionUtils;
-
-import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.meta.CanalMetaManager;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.alibaba.otter.canal.store.helper.CanalEventUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
 
 /**
- * 基于meta信息的实现
- * 
- * @author jianghang 2012-7-10 下午05:02:33
- * @version 1.0.0
+ * Created by yinxiu on 17/3/18.
+ * Email: marklin.hz@gmail.com
  */
-public class MetaLogPositionManager extends AbstractCanalLifeCycle implements CanalLogPositionManager {
+public class MetaLogPositionManager extends AbstractLogPositionManager {
 
-    private static final Logger logger = LoggerFactory.getLogger(MetaLogPositionManager.class);
-    private CanalMetaManager    metaManager;
+    private final static Logger logger = LoggerFactory.getLogger(MetaLogPositionManager.class);
 
-    public void start() {
-        super.start();
-        Assert.notNull(metaManager);
-        if (!metaManager.isStart()) {
-            metaManager.start();
+    private final CanalMetaManager metaManager;
+
+    public MetaLogPositionManager(CanalMetaManager metaManager) {
+        if (metaManager == null) {
+            throw new NullPointerException("null metaManager");
         }
+
+        this.metaManager = metaManager;
     }
 
+    @Override
     public void stop() {
         super.stop();
+
         if (metaManager.isStart()) {
             metaManager.stop();
         }
     }
 
-    public void persistLogPosition(String destination, LogPosition logPosition) {
-        // do nothing
-        logger.info("persist LogPosition:{}", destination, logPosition);
+    @Override
+    public void start() {
+        super.start();
+
+        if (!metaManager.isStart()) {
+            metaManager.start();
+        }
     }
 
+    @Override
     public LogPosition getLatestIndexBy(String destination) {
-        List<ClientIdentity> clientIdentitys = metaManager.listAllSubscribeInfo(destination);
+        List<ClientIdentity> clientIdentities = metaManager.listAllSubscribeInfo(destination);
         LogPosition result = null;
-        if (!CollectionUtils.isEmpty(clientIdentitys)) {
+        if (!CollectionUtils.isEmpty(clientIdentities)) {
             // 尝试找到一个最小的logPosition
-            for (ClientIdentity clientIdentity : clientIdentitys) {
+            for (ClientIdentity clientIdentity : clientIdentities) {
                 LogPosition position = (LogPosition) metaManager.getCursor(clientIdentity);
                 if (position == null) {
                     continue;
@@ -66,8 +70,9 @@ public class MetaLogPositionManager extends AbstractCanalLifeCycle implements Ca
         return result;
     }
 
-    public void setMetaManager(CanalMetaManager metaManager) {
-        this.metaManager = metaManager;
+    @Override
+    public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
+        // do nothing
+        logger.info("persist LogPosition:{}", destination, logPosition);
     }
-
 }

+ 52 - 59
parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java

@@ -1,90 +1,83 @@
 package com.alibaba.otter.canal.parse.index;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
+import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.protocol.position.LogPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
 
-import com.alibaba.otter.canal.protocol.position.LogPosition;
-import com.google.common.base.Function;
-import com.google.common.collect.MigrateMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
- * 混合memory + zookeeper的存储模式
- * 
- * @author jianghang 2012-7-7 上午10:33:19
- * @version 1.0.0
+ * Created by yinxiu on 17/3/17.
+ * Email: marklin.hz@gmail.com
+ *
+ * Memory first.
+ * Asynchronous commit position info to ZK.
  */
-public class MixedLogPositionManager extends MemoryLogPositionManager implements CanalLogPositionManager {
+public class MixedLogPositionManager extends AbstractLogPositionManager {
 
-    private static final Logger         logger       = LoggerFactory.getLogger(MixedLogPositionManager.class);
-    private ZooKeeperLogPositionManager zooKeeperLogPositionManager;
-    private ExecutorService             executor;
-    @SuppressWarnings("serial")
-    private final LogPosition           nullPosition = new LogPosition() {
-                                                     };
+    private final Logger logger = LoggerFactory.getLogger(MixedLogPositionManager.class);
 
-    public void start() {
-        super.start();
+    private final MemoryLogPositionManager memoryLogPositionManager;
+    private final ZooKeeperLogPositionManager zooKeeperLogPositionManager;
 
-        Assert.notNull(zooKeeperLogPositionManager);
-        if (!zooKeeperLogPositionManager.isStart()) {
-            zooKeeperLogPositionManager.start();
+    private final ExecutorService executor;
+
+    public MixedLogPositionManager(ZkClientx zkClient) {
+        if (zkClient == null) {
+            throw new NullPointerException("null zkClient");
         }
-        executor = Executors.newFixedThreadPool(1);
-        positions = MigrateMap.makeComputingMap(new Function<String, LogPosition>() {
 
-            public LogPosition apply(String destination) {
-                LogPosition logPosition = zooKeeperLogPositionManager.getLatestIndexBy(destination);
-                if (logPosition == null) {
-                    return nullPosition;
-                } else {
-                    return logPosition;
-                }
-            }
-        });
+        this.memoryLogPositionManager = new MemoryLogPositionManager();
+        this.zooKeeperLogPositionManager = new ZooKeeperLogPositionManager(zkClient);
+
+        this.executor = Executors.newFixedThreadPool(1);
     }
 
+    @Override
+    public void start() {
+        super.start();
+
+        memoryLogPositionManager.start();
+        zooKeeperLogPositionManager.start();
+    }
+
+    @Override
     public void stop() {
         super.stop();
 
-        if (zooKeeperLogPositionManager.isStart()) {
-            zooKeeperLogPositionManager.stop();
+        executor.shutdown();
+        zooKeeperLogPositionManager.stop();
+        memoryLogPositionManager.stop();
+    }
+
+    @Override
+    public LogPosition getLatestIndexBy(String destination) {
+        LogPosition logPosition = memoryLogPositionManager.getLatestIndexBy(destination);
+        if (logPosition != null) {
+            return logPosition;
         }
-        executor.shutdownNow();
-        positions.clear();
+        logPosition = zooKeeperLogPositionManager.getLatestIndexBy(destination);
+        // 这里保持和重构前的逻辑一致,重新添加到Memory中
+        if (logPosition != null) {
+            memoryLogPositionManager.persistLogPosition(destination, logPosition);
+        }
+        return logPosition;
     }
 
-    public void persistLogPosition(final String destination, final LogPosition logPosition) {
-        super.persistLogPosition(destination, logPosition);
+    @Override
+    public void persistLogPosition(final String destination, final LogPosition logPosition) throws CanalParseException {
+        memoryLogPositionManager.persistLogPosition(destination, logPosition);
         executor.submit(new Runnable() {
-
             public void run() {
                 try {
                     zooKeeperLogPositionManager.persistLogPosition(destination, logPosition);
                 } catch (Exception e) {
-                    logger.error("ERROR # persist to zookeepr has an error", e);
+                    logger.error("ERROR # persist to zookeeper has an error", e);
                 }
             }
         });
-
     }
-
-    public LogPosition getLatestIndexBy(String destination) {
-        LogPosition logPosition = super.getLatestIndexBy(destination);
-        if (logPosition == nullPosition) {
-            return null;
-        } else {
-            return logPosition;
-        }
-    }
-
-    // ======================== setter / getter ======================
-
-    public void setZooKeeperLogPositionManager(ZooKeeperLogPositionManager zooKeeperLogPositionManager) {
-        this.zooKeeperLogPositionManager = zooKeeperLogPositionManager;
-    }
-
 }

+ 66 - 66
parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java

@@ -1,64 +1,81 @@
 package com.alibaba.otter.canal.parse.index;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.protocol.position.LogPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
 
-import com.alibaba.otter.canal.protocol.position.LogPosition;
-import com.google.common.base.Function;
-import com.google.common.collect.MigrateMap;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
- * 基于定时刷新的策略的mixed实现
- * 
- * @author jianghang 2012-9-12 上午11:18:14
- * @version 1.0.0
+ * Created by yinxiu on 17/3/18.
+ * Email: marklin.hz@gmail.com
  */
-public class PeriodMixedLogPositionManager extends MemoryLogPositionManager implements CanalLogPositionManager {
+public class PeriodMixedLogPositionManager extends AbstractLogPositionManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(PeriodMixedLogPositionManager.class);
 
-    private static final Logger         logger       = LoggerFactory.getLogger(PeriodMixedLogPositionManager.class);
+    private MemoryLogPositionManager memoryLogPositionManager;
     private ZooKeeperLogPositionManager zooKeeperLogPositionManager;
-    private ScheduledExecutorService    executor;
-    @SuppressWarnings("serial")
-    private final LogPosition           nullPosition = new LogPosition() {
-                                                     };
+    private ScheduledExecutorService executorService;
+
+    private long period;
+    private Set<String> persistTasks;
+
+    private final LogPosition nullPosition = new LogPosition(){};
+
+    public PeriodMixedLogPositionManager(MemoryLogPositionManager memoryLogPositionManager, ZooKeeperLogPositionManager zooKeeperLogPositionManager, long period) {
+        if (memoryLogPositionManager == null) {
+            throw new NullPointerException("null memoryLogPositionManager");
+        }
+
+        if (zooKeeperLogPositionManager == null) {
+            throw new NullPointerException("null zooKeeperLogPositionManager");
+        }
+
+        if (period <= 0) {
+            throw new IllegalArgumentException("period must be positive, given: " + period);
+        }
+
+        this.memoryLogPositionManager = memoryLogPositionManager;
+        this.zooKeeperLogPositionManager = zooKeeperLogPositionManager;
+        this.period = period;
+        this.persistTasks = Collections.synchronizedSet(new HashSet<String>());
+        this.executorService = Executors.newScheduledThreadPool(1);
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+
+        if (zooKeeperLogPositionManager.isStart()) {
+            zooKeeperLogPositionManager.stop();
+        }
 
-    private long                        period       = 1000;                                                        // 单位ms
-    private Set<String>                 persistTasks;
+        if (memoryLogPositionManager.isStart()) {
+            memoryLogPositionManager.stop();
+        }
 
+        executorService.shutdown();
+    }
+
+    @Override
     public void start() {
         super.start();
 
-        Assert.notNull(zooKeeperLogPositionManager);
+        if (!memoryLogPositionManager.isStart()) {
+            memoryLogPositionManager.start();
+        }
+
         if (!zooKeeperLogPositionManager.isStart()) {
             zooKeeperLogPositionManager.start();
         }
-        executor = Executors.newScheduledThreadPool(1);
-        positions = MigrateMap.makeComputingMap(new Function<String, LogPosition>() {
-
-            public LogPosition apply(String destination) {
-                LogPosition logPosition = zooKeeperLogPositionManager.getLatestIndexBy(destination);
-                if (logPosition == null) {
-                    return nullPosition;
-                } else {
-                    return logPosition;
-                }
-            }
-        });
-
-        persistTasks = Collections.synchronizedSet(new HashSet<String>());
 
         // 启动定时工作任务
-        executor.scheduleAtFixedRate(new Runnable() {
+        executorService.scheduleAtFixedRate(new Runnable() {
 
             public void run() {
                 List<String> tasks = new ArrayList<String>(persistTasks);
@@ -76,36 +93,19 @@ public class PeriodMixedLogPositionManager extends MemoryLogPositionManager impl
         }, period, period, TimeUnit.MILLISECONDS);
     }
 
-    public void stop() {
-        super.stop();
-
-        if (zooKeeperLogPositionManager.isStart()) {
-            zooKeeperLogPositionManager.stop();
-        }
-        executor.shutdownNow();
-        positions.clear();
-    }
-
-    public void persistLogPosition(String destination, LogPosition logPosition) {
-        persistTasks.add(destination);// 添加到任务队列中进行触发
-        super.persistLogPosition(destination, logPosition);
-    }
-
+    @Override
     public LogPosition getLatestIndexBy(String destination) {
-        LogPosition logPostion = super.getLatestIndexBy(destination);
-        if (logPostion == nullPosition) {
+        LogPosition logPosition = memoryLogPositionManager.getLatestIndexBy(destination);
+        if (logPosition == nullPosition) {
             return null;
         } else {
-            return logPostion;
+            return logPosition;
         }
     }
 
-    public void setZooKeeperLogPositionManager(ZooKeeperLogPositionManager zooKeeperLogPositionManager) {
-        this.zooKeeperLogPositionManager = zooKeeperLogPositionManager;
-    }
-
-    public void setPeriod(long period) {
-        this.period = period;
+    @Override
+    public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
+        persistTasks.add(destination);
+        memoryLogPositionManager.persistLogPosition(destination, logPosition);
     }
-
 }

+ 14 - 23
parse/src/main/java/com/alibaba/otter/canal/parse/index/ZooKeeperLogPositionManager.java

@@ -1,33 +1,29 @@
 package com.alibaba.otter.canal.parse.index;
 
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.springframework.util.Assert;
-
-import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.common.utils.JsonUtils;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
 
 /**
- * 基于zk的实现
- * 
- * @author jianghang 2012-7-7 上午10:08:27
- * @version 1.0.0
+ * Created by yinxiu on 17/3/17.
+ * Email: marklin.hz@gmail.com
  */
-public class ZooKeeperLogPositionManager extends AbstractCanalLifeCycle implements CanalLogPositionManager {
+public class ZooKeeperLogPositionManager extends AbstractLogPositionManager {
 
-    private ZkClientx zkClientx;
+    private final ZkClientx zkClientx;
 
-    public void start() {
-        super.start();
-        Assert.notNull(zkClientx);
-    }
+    public ZooKeeperLogPositionManager(ZkClientx zkClient) {
+        if (zkClient == null) {
+            throw new NullPointerException("null zkClient");
+        }
 
-    public void stop() {
-        super.stop();
+        this.zkClientx = zkClient;
     }
 
+    @Override
     public LogPosition getLatestIndexBy(String destination) {
         String path = ZookeeperPathUtils.getParsePath(destination);
         byte[] data = zkClientx.readData(path, true);
@@ -38,7 +34,8 @@ public class ZooKeeperLogPositionManager extends AbstractCanalLifeCycle implemen
         return JsonUtils.unmarshalFromByte(data, LogPosition.class);
     }
 
-    public void persistLogPosition(String destination, LogPosition logPosition) {
+    @Override
+    public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
         String path = ZookeeperPathUtils.getParsePath(destination);
         byte[] data = JsonUtils.marshalToByte(logPosition);
         try {
@@ -48,10 +45,4 @@ public class ZooKeeperLogPositionManager extends AbstractCanalLifeCycle implemen
         }
     }
 
-    // ================== setter / getter =================
-
-    public void setZkClientx(ZkClientx zkClientx) {
-        this.zkClientx = zkClientx;
-    }
-
 }

+ 8 - 7
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/group/GroupEventPaserTest.java

@@ -2,13 +2,13 @@ package com.alibaba.otter.canal.parse.inbound.group;
 
 import java.net.InetSocketAddress;
 
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import org.junit.Test;
 
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.AbstractBinlogParser;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
-import com.alibaba.otter.canal.parse.stub.AbstractCanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
@@ -70,15 +70,16 @@ public class GroupEventPaserTest {
         mysqlEventPaser.setMasterPosition(defaultPosition);
         mysqlEventPaser.setBinlogParser(buildParser(buildAuthentication()));
         mysqlEventPaser.setEventSink(new EntryEventSink());
-        mysqlEventPaser.setLogPositionManager(new AbstractCanalLogPositionManager() {
-
-            public void persistLogPosition(String destination, LogPosition logPosition) {
-                // System.out.println(logPosition);
-            }
-
+        mysqlEventPaser.setLogPositionManager(new AbstractLogPositionManager() {
+            @Override
             public LogPosition getLatestIndexBy(String destination) {
                 return null;
             }
+
+            @Override
+            public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
+                System.out.println(logPosition);
+            }
         });
         return mysqlEventPaser;
     }

+ 12 - 11
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogDumpTest.java

@@ -4,11 +4,12 @@ import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.List;
 
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import org.junit.Assert;
 import org.junit.Test;
 
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
-import com.alibaba.otter.canal.parse.stub.AbstractCanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
@@ -80,17 +81,17 @@ public class LocalBinlogDumpTest {
             }
 
         });
-        controller.setLogPositionManager(new AbstractCanalLogPositionManager() {
-
-            public void persistLogPosition(String destination, LogPosition logPosition) {
-                System.out.println(logPosition);
-            }
+        controller.setLogPositionManager(new AbstractLogPositionManager() {
+                    @Override
+                    public LogPosition getLatestIndexBy(String destination) {
+                        return null;
+                    }
 
-            @Override
-            public LogPosition getLatestIndexBy(String destination) {
-                return null;
-            }
-        });
+                    @Override
+                    public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
+                        System.out.println(logPosition);
+                    }
+                });
 
         controller.start();
 

+ 4 - 4
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParserTest.java

@@ -7,13 +7,13 @@ import java.util.Date;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import com.alibaba.otter.canal.parse.helper.TimeoutChecker;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
-import com.alibaba.otter.canal.parse.stub.AbstractCanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
@@ -70,7 +70,7 @@ public class LocalBinlogEventParserTest {
 
         });
 
-        controller.setLogPositionManager(new AbstractCanalLogPositionManager() {
+        controller.setLogPositionManager(new AbstractLogPositionManager() {
 
             public void persistLogPosition(String destination, LogPosition logPosition) {
                 System.out.println(logPosition);
@@ -133,7 +133,7 @@ public class LocalBinlogEventParserTest {
             }
         });
 
-        controller.setLogPositionManager(new AbstractCanalLogPositionManager() {
+        controller.setLogPositionManager(new AbstractLogPositionManager() {
 
             public void persistLogPosition(String destination, LogPosition logPosition) {
                 System.out.println(logPosition);
@@ -198,7 +198,7 @@ public class LocalBinlogEventParserTest {
             }
         });
 
-        controller.setLogPositionManager(new AbstractCanalLogPositionManager() {
+        controller.setLogPositionManager(new AbstractLogPositionManager() {
 
             public void persistLogPosition(String destination, LogPosition logPosition) {
                 System.out.println(logPosition);

+ 8 - 7
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java

@@ -4,11 +4,12 @@ import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.List;
 
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import org.junit.Assert;
 import org.junit.Test;
 
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
-import com.alibaba.otter.canal.parse.stub.AbstractCanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
@@ -83,16 +84,16 @@ public class MysqlDumpTest {
             }
 
         });
-        controller.setLogPositionManager(new AbstractCanalLogPositionManager() {
-
-            public void persistLogPosition(String destination, LogPosition logPosition) {
-                System.out.println(logPosition);
-            }
-
+        controller.setLogPositionManager(new AbstractLogPositionManager() {
             @Override
             public LogPosition getLatestIndexBy(String destination) {
                 return null;
             }
+
+            @Override
+            public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
+                System.out.println(logPosition);
+            }
         });
 
         controller.start();

+ 11 - 10
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParserTest.java

@@ -5,12 +5,13 @@ import java.util.Date;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import org.junit.Assert;
 import org.junit.Test;
 
 import com.alibaba.otter.canal.parse.helper.TimeoutChecker;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
-import com.alibaba.otter.canal.parse.stub.AbstractCanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
@@ -67,16 +68,16 @@ public class MysqlEventParserTest {
             }
         });
 
-        controller.setLogPositionManager(new AbstractCanalLogPositionManager() {
-
-            public void persistLogPosition(String destination, LogPosition logPosition) {
-                System.out.println(logPosition);
-            }
-
+        controller.setLogPositionManager(new AbstractLogPositionManager() {
             @Override
             public LogPosition getLatestIndexBy(String destination) {
                 return null;
             }
+
+            @Override
+            public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
+                System.out.println(logPosition);
+            }
         });
 
         controller.start();
@@ -136,7 +137,7 @@ public class MysqlEventParserTest {
             }
         });
 
-        controller.setLogPositionManager(new AbstractCanalLogPositionManager() {
+        controller.setLogPositionManager(new AbstractLogPositionManager() {
 
             public void persistLogPosition(String destination, LogPosition logPosition) {
                 System.out.println(logPosition);
@@ -205,7 +206,7 @@ public class MysqlEventParserTest {
             }
         });
 
-        controller.setLogPositionManager(new AbstractCanalLogPositionManager() {
+        controller.setLogPositionManager(new AbstractLogPositionManager() {
 
             public void persistLogPosition(String destination, LogPosition logPosition) {
                 System.out.println(logPosition);
@@ -281,7 +282,7 @@ public class MysqlEventParserTest {
             }
         });
 
-        controller.setLogPositionManager(new AbstractCanalLogPositionManager() {
+        controller.setLogPositionManager(new AbstractLogPositionManager() {
 
             public void persistLogPosition(String destination, LogPosition logPosition) {
                 System.out.println(logPosition);

+ 4 - 6
parse/src/test/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManagerTest.java

@@ -26,17 +26,15 @@ public class FileMixedLogPositionManagerTest extends AbstractLogPositionManagerT
 
     @Test
     public void testAll() {
-        FileMixedLogPositionManager logPositionManager = new FileMixedLogPositionManager();
-        logPositionManager.setDataDir(dataDir);
-        logPositionManager.setPeriod(100);
+        MemoryLogPositionManager memoryLogPositionManager = new MemoryLogPositionManager();
+
+        FileMixedLogPositionManager logPositionManager = new FileMixedLogPositionManager(dataDir, 1000, memoryLogPositionManager);
         logPositionManager.start();
 
         LogPosition position2 = doTest(logPositionManager);
         sleep(1500);
 
-        FileMixedLogPositionManager logPositionManager2 = new FileMixedLogPositionManager();
-        logPositionManager2.setDataDir(dataDir);
-        logPositionManager2.setPeriod(100);
+        FileMixedLogPositionManager logPositionManager2 = new FileMixedLogPositionManager(dataDir, 1000, memoryLogPositionManager);
         logPositionManager2.start();
 
         LogPosition getPosition2 = logPositionManager2.getLatestIndexBy(destination);

+ 0 - 8
parse/src/test/java/com/alibaba/otter/canal/parse/stub/AbstractCanalLogPositionManager.java

@@ -1,8 +0,0 @@
-package com.alibaba.otter.canal.parse.stub;
-
-import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
-import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
-
-public abstract class AbstractCanalLogPositionManager extends AbstractCanalLifeCycle implements CanalLogPositionManager {
-
-}