Browse Source

fixed format & import

agapple 8 years ago
parent
commit
fdd40224d5
18 changed files with 123 additions and 104 deletions
  1. 3 1
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java
  2. 2 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  3. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java
  4. 1 2
      parse/src/main/java/com/alibaba/otter/canal/parse/index/AbstractLogPositionManager.java
  5. 1 2
      parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java
  6. 10 8
      parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java
  7. 28 23
      parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java
  8. 4 5
      parse/src/main/java/com/alibaba/otter/canal/parse/index/MemoryLogPositionManager.java
  9. 9 9
      parse/src/main/java/com/alibaba/otter/canal/parse/index/MetaLogPositionManager.java
  10. 12 13
      parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java
  11. 21 15
      parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java
  12. 4 4
      parse/src/main/java/com/alibaba/otter/canal/parse/index/ZooKeeperLogPositionManager.java
  13. 2 1
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/group/GroupEventPaserTest.java
  14. 12 11
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogDumpTest.java
  15. 1 1
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParserTest.java
  16. 3 2
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java
  17. 3 2
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParserTest.java
  18. 6 2
      parse/src/test/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManagerTest.java

+ 3 - 1
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -349,7 +349,9 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
         } else if (indexMode.isMixed()) {
         } else if (indexMode.isMixed()) {
             MemoryLogPositionManager memoryLogPositionManager = new MemoryLogPositionManager();
             MemoryLogPositionManager memoryLogPositionManager = new MemoryLogPositionManager();
             ZooKeeperLogPositionManager zooKeeperLogPositionManager = new ZooKeeperLogPositionManager(getZkclientx());
             ZooKeeperLogPositionManager zooKeeperLogPositionManager = new ZooKeeperLogPositionManager(getZkclientx());
-            logPositionManager = new PeriodMixedLogPositionManager(memoryLogPositionManager, zooKeeperLogPositionManager, 1000L);
+            logPositionManager = new PeriodMixedLogPositionManager(memoryLogPositionManager,
+                zooKeeperLogPositionManager,
+                1000L);
         } else if (indexMode.isMeta()) {
         } else if (indexMode.isMeta()) {
             logPositionManager = new MetaLogPositionManager(metaManager);
             logPositionManager = new MetaLogPositionManager(metaManager);
         } else if (indexMode.isMemoryMetaFailback()) {
         } else if (indexMode.isMemoryMetaFailback()) {

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -8,7 +8,6 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 
 
-import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.math.RandomUtils;
 import org.apache.commons.lang.math.RandomUtils;
@@ -23,6 +22,7 @@ import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback;
 import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback;
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
@@ -44,7 +44,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
 
 
     protected final Logger                           logger                     = LoggerFactory.getLogger(this.getClass());
     protected final Logger                           logger                     = LoggerFactory.getLogger(this.getClass());
 
 
-    protected CanalLogPositionManager logPositionManager         = null;
+    protected CanalLogPositionManager                logPositionManager         = null;
     protected CanalEventSink<List<CanalEntry.Entry>> eventSink                  = null;
     protected CanalEventSink<List<CanalEntry.Entry>> eventSink                  = null;
     protected CanalEventFilter                       eventFilter                = null;
     protected CanalEventFilter                       eventFilter                = null;
     protected CanalEventFilter                       eventBlackFilter           = null;
     protected CanalEventFilter                       eventBlackFilter           = null;

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -2,7 +2,6 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
-import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 
 
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.CanalEventParser;
@@ -10,6 +9,7 @@ import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.alibaba.otter.canal.protocol.position.LogPosition;

+ 1 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/index/AbstractLogPositionManager.java

@@ -3,8 +3,7 @@ package com.alibaba.otter.canal.parse.index;
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 
 
 /**
 /**
- * Created by yinxiu on 17/3/17.
- * Email: marklin.hz@gmail.com
+ * Created by yinxiu on 17/3/17. Email: marklin.hz@gmail.com
  */
  */
 public abstract class AbstractLogPositionManager extends AbstractCanalLifeCycle implements CanalLogPositionManager {
 public abstract class AbstractLogPositionManager extends AbstractCanalLifeCycle implements CanalLogPositionManager {
 }
 }

+ 1 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java

@@ -5,8 +5,7 @@ import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 
 
 /**
 /**
- * Created by yinxiu on 17/3/17.
- * Email: marklin.hz@gmail.com
+ * Created by yinxiu on 17/3/17. Email: marklin.hz@gmail.com
  */
  */
 public interface CanalLogPositionManager extends CanalLifeCycle {
 public interface CanalLogPositionManager extends CanalLifeCycle {
 
 

+ 10 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java

@@ -1,14 +1,13 @@
 package com.alibaba.otter.canal.parse.index;
 package com.alibaba.otter.canal.parse.index;
 
 
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
-import com.alibaba.otter.canal.protocol.position.LogPosition;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.protocol.position.LogPosition;
+
 /**
 /**
- * Created by yinxiu on 17/3/18.
- * Email: marklin.hz@gmail.com
- *
+ * Created by yinxiu on 17/3/18. Email: marklin.hz@gmail.com
  * 实现基于failover查找的机制完成meta的操作
  * 实现基于failover查找的机制完成meta的操作
  *
  *
  * <pre>
  * <pre>
@@ -17,12 +16,12 @@ import org.slf4j.LoggerFactory;
  */
  */
 public class FailbackLogPositionManager extends AbstractLogPositionManager {
 public class FailbackLogPositionManager extends AbstractLogPositionManager {
 
 
-    private final static Logger logger = LoggerFactory.getLogger(FailbackLogPositionManager.class);
+    private final static Logger           logger = LoggerFactory.getLogger(FailbackLogPositionManager.class);
 
 
     private final CanalLogPositionManager primary;
     private final CanalLogPositionManager primary;
     private final CanalLogPositionManager secondary;
     private final CanalLogPositionManager secondary;
 
 
-    public FailbackLogPositionManager(CanalLogPositionManager primary, CanalLogPositionManager secondary) {
+    public FailbackLogPositionManager(CanalLogPositionManager primary, CanalLogPositionManager secondary){
         if (primary == null) {
         if (primary == null) {
             throw new NullPointerException("nul primary LogPositionManager");
             throw new NullPointerException("nul primary LogPositionManager");
         }
         }
@@ -74,7 +73,10 @@ public class FailbackLogPositionManager extends AbstractLogPositionManager {
         try {
         try {
             primary.persistLogPosition(destination, logPosition);
             primary.persistLogPosition(destination, logPosition);
         } catch (CanalParseException e) {
         } catch (CanalParseException e) {
-            logger.warn("persistLogPosition use primary log position manager exception. destination: {}, logPosition: {}", destination, logPosition, e);
+            logger.warn("persistLogPosition use primary log position manager exception. destination: {}, logPosition: {}",
+                destination,
+                logPosition,
+                e);
             secondary.persistLogPosition(destination, logPosition);
             secondary.persistLogPosition(destination, logPosition);
         }
         }
     }
     }

+ 28 - 23
parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java

@@ -1,28 +1,32 @@
 package com.alibaba.otter.canal.parse.index;
 package com.alibaba.otter.canal.parse.index;
 
 
-import com.alibaba.otter.canal.common.utils.JsonUtils;
-import com.alibaba.otter.canal.meta.exception.CanalMetaManagerException;
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
-import com.alibaba.otter.canal.protocol.position.LogPosition;
-import com.google.common.base.Function;
-import com.google.common.collect.MigrateMap;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.charset.Charset;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.common.utils.JsonUtils;
+import com.alibaba.otter.canal.meta.exception.CanalMetaManagerException;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.protocol.position.LogPosition;
+import com.google.common.base.Function;
+import com.google.common.collect.MigrateMap;
+
 /**
 /**
- * Created by yinxiu on 17/3/18.
- * Email: marklin.hz@gmail.com
- *
- * 基于文件刷新的log position实现
+ * Created by yinxiu on 17/3/18. Email: marklin.hz@gmail.com 基于文件刷新的log
+ * position实现
  *
  *
  * <pre>
  * <pre>
  * 策略:
  * 策略:
@@ -32,23 +36,24 @@ import java.util.concurrent.TimeUnit;
  */
  */
 public class FileMixedLogPositionManager extends AbstractLogPositionManager {
 public class FileMixedLogPositionManager extends AbstractLogPositionManager {
 
 
-    private final static Logger logger = LoggerFactory.getLogger(FileMixedLogPositionManager.class);
-    private final static Charset charset = Charset.forName("UTF-8");
+    private final static Logger      logger       = LoggerFactory.getLogger(FileMixedLogPositionManager.class);
+    private final static Charset     charset      = Charset.forName("UTF-8");
 
 
-    private File dataDir;
+    private File                     dataDir;
 
 
-    private Map<String, File> dataFileCaches;
+    private Map<String, File>        dataFileCaches;
 
 
     private ScheduledExecutorService executorService;
     private ScheduledExecutorService executorService;
 
 
-    private final LogPosition nullPosition = new LogPosition(){};
+    private final LogPosition        nullPosition = new LogPosition() {
+                                                  };
 
 
     private MemoryLogPositionManager memoryLogPositionManager;
     private MemoryLogPositionManager memoryLogPositionManager;
 
 
-    private long period;
-    private Set<String> persistTasks;
+    private long                     period;
+    private Set<String>              persistTasks;
 
 
-    public FileMixedLogPositionManager(File dataDir, long period, MemoryLogPositionManager memoryLogPositionManager) {
+    public FileMixedLogPositionManager(File dataDir, long period, MemoryLogPositionManager memoryLogPositionManager){
         if (dataDir == null) {
         if (dataDir == null) {
             throw new NullPointerException("null dataDir");
             throw new NullPointerException("null dataDir");
         }
         }

+ 4 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/index/MemoryLogPositionManager.java

@@ -1,15 +1,14 @@
 package com.alibaba.otter.canal.parse.index;
 package com.alibaba.otter.canal.parse.index;
 
 
+import java.util.Map;
+import java.util.Set;
+
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.MapMaker;
 
 
-import java.util.Map;
-import java.util.Set;
-
 /**
 /**
- * Created by yinxiu on 17/3/17.
- * Email: marklin.hz@gmail.com
+ * Created by yinxiu on 17/3/17. Email: marklin.hz@gmail.com
  */
  */
 public class MemoryLogPositionManager extends AbstractLogPositionManager {
 public class MemoryLogPositionManager extends AbstractLogPositionManager {
 
 

+ 9 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/index/MetaLogPositionManager.java

@@ -1,27 +1,27 @@
 package com.alibaba.otter.canal.parse.index;
 package com.alibaba.otter.canal.parse.index;
 
 
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
 import com.alibaba.otter.canal.meta.CanalMetaManager;
 import com.alibaba.otter.canal.meta.CanalMetaManager;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.alibaba.otter.canal.store.helper.CanalEventUtils;
 import com.alibaba.otter.canal.store.helper.CanalEventUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.CollectionUtils;
-
-import java.util.List;
 
 
 /**
 /**
- * Created by yinxiu on 17/3/18.
- * Email: marklin.hz@gmail.com
+ * Created by yinxiu on 17/3/18. Email: marklin.hz@gmail.com
  */
  */
 public class MetaLogPositionManager extends AbstractLogPositionManager {
 public class MetaLogPositionManager extends AbstractLogPositionManager {
 
 
-    private final static Logger logger = LoggerFactory.getLogger(MetaLogPositionManager.class);
+    private final static Logger    logger = LoggerFactory.getLogger(MetaLogPositionManager.class);
 
 
     private final CanalMetaManager metaManager;
     private final CanalMetaManager metaManager;
 
 
-    public MetaLogPositionManager(CanalMetaManager metaManager) {
+    public MetaLogPositionManager(CanalMetaManager metaManager){
         if (metaManager == null) {
         if (metaManager == null) {
             throw new NullPointerException("null metaManager");
             throw new NullPointerException("null metaManager");
         }
         }

+ 12 - 13
parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java

@@ -1,31 +1,29 @@
 package com.alibaba.otter.canal.parse.index;
 package com.alibaba.otter.canal.parse.index;
 
 
-import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
-import com.alibaba.otter.canal.protocol.position.LogPosition;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.protocol.position.LogPosition;
 
 
 /**
 /**
- * Created by yinxiu on 17/3/17.
- * Email: marklin.hz@gmail.com
- *
- * Memory first.
+ * Created by yinxiu on 17/3/17. Email: marklin.hz@gmail.com Memory first.
  * Asynchronous commit position info to ZK.
  * Asynchronous commit position info to ZK.
  */
  */
 public class MixedLogPositionManager extends AbstractLogPositionManager {
 public class MixedLogPositionManager extends AbstractLogPositionManager {
 
 
-    private final Logger logger = LoggerFactory.getLogger(MixedLogPositionManager.class);
+    private final Logger                      logger = LoggerFactory.getLogger(MixedLogPositionManager.class);
 
 
-    private final MemoryLogPositionManager memoryLogPositionManager;
+    private final MemoryLogPositionManager    memoryLogPositionManager;
     private final ZooKeeperLogPositionManager zooKeeperLogPositionManager;
     private final ZooKeeperLogPositionManager zooKeeperLogPositionManager;
 
 
-    private final ExecutorService executor;
+    private final ExecutorService             executor;
 
 
-    public MixedLogPositionManager(ZkClientx zkClient) {
+    public MixedLogPositionManager(ZkClientx zkClient){
         if (zkClient == null) {
         if (zkClient == null) {
             throw new NullPointerException("null zkClient");
             throw new NullPointerException("null zkClient");
         }
         }
@@ -71,6 +69,7 @@ public class MixedLogPositionManager extends AbstractLogPositionManager {
     public void persistLogPosition(final String destination, final LogPosition logPosition) throws CanalParseException {
     public void persistLogPosition(final String destination, final LogPosition logPosition) throws CanalParseException {
         memoryLogPositionManager.persistLogPosition(destination, logPosition);
         memoryLogPositionManager.persistLogPosition(destination, logPosition);
         executor.submit(new Runnable() {
         executor.submit(new Runnable() {
+
             public void run() {
             public void run() {
                 try {
                 try {
                     zooKeeperLogPositionManager.persistLogPosition(destination, logPosition);
                     zooKeeperLogPositionManager.persistLogPosition(destination, logPosition);

+ 21 - 15
parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java

@@ -1,33 +1,39 @@
 package com.alibaba.otter.canal.parse.index;
 package com.alibaba.otter.canal.parse.index;
 
 
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
-import com.alibaba.otter.canal.protocol.position.LogPosition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.protocol.position.LogPosition;
+
 /**
 /**
- * Created by yinxiu on 17/3/18.
- * Email: marklin.hz@gmail.com
+ * Created by yinxiu on 17/3/18. Email: marklin.hz@gmail.com
  */
  */
 public class PeriodMixedLogPositionManager extends AbstractLogPositionManager {
 public class PeriodMixedLogPositionManager extends AbstractLogPositionManager {
 
 
-    private static final Logger logger = LoggerFactory.getLogger(PeriodMixedLogPositionManager.class);
+    private static final Logger         logger       = LoggerFactory.getLogger(PeriodMixedLogPositionManager.class);
 
 
-    private MemoryLogPositionManager memoryLogPositionManager;
+    private MemoryLogPositionManager    memoryLogPositionManager;
     private ZooKeeperLogPositionManager zooKeeperLogPositionManager;
     private ZooKeeperLogPositionManager zooKeeperLogPositionManager;
-    private ScheduledExecutorService executorService;
+    private ScheduledExecutorService    executorService;
 
 
-    private long period;
-    private Set<String> persistTasks;
+    private long                        period;
+    private Set<String>                 persistTasks;
 
 
-    private final LogPosition nullPosition = new LogPosition(){};
+    private final LogPosition           nullPosition = new LogPosition() {
+                                                     };
 
 
-    public PeriodMixedLogPositionManager(MemoryLogPositionManager memoryLogPositionManager, ZooKeeperLogPositionManager zooKeeperLogPositionManager, long period) {
+    public PeriodMixedLogPositionManager(MemoryLogPositionManager memoryLogPositionManager,
+                                         ZooKeeperLogPositionManager zooKeeperLogPositionManager, long period){
         if (memoryLogPositionManager == null) {
         if (memoryLogPositionManager == null) {
             throw new NullPointerException("null memoryLogPositionManager");
             throw new NullPointerException("null memoryLogPositionManager");
         }
         }

+ 4 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/index/ZooKeeperLogPositionManager.java

@@ -1,21 +1,21 @@
 package com.alibaba.otter.canal.parse.index;
 package com.alibaba.otter.canal.parse.index;
 
 
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+
 import com.alibaba.otter.canal.common.utils.JsonUtils;
 import com.alibaba.otter.canal.common.utils.JsonUtils;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
 import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
 
 
 /**
 /**
- * Created by yinxiu on 17/3/17.
- * Email: marklin.hz@gmail.com
+ * Created by yinxiu on 17/3/17. Email: marklin.hz@gmail.com
  */
  */
 public class ZooKeeperLogPositionManager extends AbstractLogPositionManager {
 public class ZooKeeperLogPositionManager extends AbstractLogPositionManager {
 
 
     private final ZkClientx zkClientx;
     private final ZkClientx zkClientx;
 
 
-    public ZooKeeperLogPositionManager(ZkClientx zkClient) {
+    public ZooKeeperLogPositionManager(ZkClientx zkClient){
         if (zkClient == null) {
         if (zkClient == null) {
             throw new NullPointerException("null zkClient");
             throw new NullPointerException("null zkClient");
         }
         }

+ 2 - 1
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/group/GroupEventPaserTest.java

@@ -2,13 +2,13 @@ package com.alibaba.otter.canal.parse.inbound.group;
 
 
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 
 
-import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import org.junit.Test;
 import org.junit.Test;
 
 
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.AbstractBinlogParser;
 import com.alibaba.otter.canal.parse.inbound.AbstractBinlogParser;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
@@ -71,6 +71,7 @@ public class GroupEventPaserTest {
         mysqlEventPaser.setBinlogParser(buildParser(buildAuthentication()));
         mysqlEventPaser.setBinlogParser(buildParser(buildAuthentication()));
         mysqlEventPaser.setEventSink(new EntryEventSink());
         mysqlEventPaser.setEventSink(new EntryEventSink());
         mysqlEventPaser.setLogPositionManager(new AbstractLogPositionManager() {
         mysqlEventPaser.setLogPositionManager(new AbstractLogPositionManager() {
+
             @Override
             @Override
             public LogPosition getLatestIndexBy(String destination) {
             public LogPosition getLatestIndexBy(String destination) {
                 return null;
                 return null;

+ 12 - 11
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogDumpTest.java

@@ -4,11 +4,11 @@ import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.nio.charset.Charset;
 import java.util.List;
 import java.util.List;
 
 
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
-import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
@@ -82,16 +82,17 @@ public class LocalBinlogDumpTest {
 
 
         });
         });
         controller.setLogPositionManager(new AbstractLogPositionManager() {
         controller.setLogPositionManager(new AbstractLogPositionManager() {
-                    @Override
-                    public LogPosition getLatestIndexBy(String destination) {
-                        return null;
-                    }
 
 
-                    @Override
-                    public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
-                        System.out.println(logPosition);
-                    }
-                });
+            @Override
+            public LogPosition getLatestIndexBy(String destination) {
+                return null;
+            }
+
+            @Override
+            public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
+                System.out.println(logPosition);
+            }
+        });
 
 
         controller.start();
         controller.start();
 
 

+ 1 - 1
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParserTest.java

@@ -7,12 +7,12 @@ import java.util.Date;
 import java.util.List;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 
 
-import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 
 
 import com.alibaba.otter.canal.parse.helper.TimeoutChecker;
 import com.alibaba.otter.canal.parse.helper.TimeoutChecker;
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

+ 3 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java

@@ -4,11 +4,11 @@ import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.nio.charset.Charset;
 import java.util.List;
 import java.util.List;
 
 
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
-import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
@@ -85,6 +85,7 @@ public class MysqlDumpTest {
 
 
         });
         });
         controller.setLogPositionManager(new AbstractLogPositionManager() {
         controller.setLogPositionManager(new AbstractLogPositionManager() {
+
             @Override
             @Override
             public LogPosition getLatestIndexBy(String destination) {
             public LogPosition getLatestIndexBy(String destination) {
                 return null;
                 return null;

+ 3 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParserTest.java

@@ -5,12 +5,12 @@ import java.util.Date;
 import java.util.List;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 
 
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
-import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.helper.TimeoutChecker;
 import com.alibaba.otter.canal.parse.helper.TimeoutChecker;
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
@@ -69,6 +69,7 @@ public class MysqlEventParserTest {
         });
         });
 
 
         controller.setLogPositionManager(new AbstractLogPositionManager() {
         controller.setLogPositionManager(new AbstractLogPositionManager() {
+
             @Override
             @Override
             public LogPosition getLatestIndexBy(String destination) {
             public LogPosition getLatestIndexBy(String destination) {
                 return null;
                 return null;

+ 6 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManagerTest.java

@@ -28,13 +28,17 @@ public class FileMixedLogPositionManagerTest extends AbstractLogPositionManagerT
     public void testAll() {
     public void testAll() {
         MemoryLogPositionManager memoryLogPositionManager = new MemoryLogPositionManager();
         MemoryLogPositionManager memoryLogPositionManager = new MemoryLogPositionManager();
 
 
-        FileMixedLogPositionManager logPositionManager = new FileMixedLogPositionManager(dataDir, 1000, memoryLogPositionManager);
+        FileMixedLogPositionManager logPositionManager = new FileMixedLogPositionManager(dataDir,
+            1000,
+            memoryLogPositionManager);
         logPositionManager.start();
         logPositionManager.start();
 
 
         LogPosition position2 = doTest(logPositionManager);
         LogPosition position2 = doTest(logPositionManager);
         sleep(1500);
         sleep(1500);
 
 
-        FileMixedLogPositionManager logPositionManager2 = new FileMixedLogPositionManager(dataDir, 1000, memoryLogPositionManager);
+        FileMixedLogPositionManager logPositionManager2 = new FileMixedLogPositionManager(dataDir,
+            1000,
+            memoryLogPositionManager);
         logPositionManager2.start();
         logPositionManager2.start();
 
 
         LogPosition getPosition2 = logPositionManager2.getLatestIndexBy(destination);
         LogPosition getPosition2 = logPositionManager2.getLatestIndexBy(destination);