Browse Source

Merge pull request #115 from wenerme/master

Typo,javadoc 修正, 更新 guava 到18.0
agapple 10 years ago
parent
commit
ea889cb004
39 changed files with 800 additions and 623 deletions
  1. 5 8
      client/src/main/java/com/alibaba/otter/canal/client/CanalConnector.java
  2. 0 1
      common/pom.xml
  3. 3 3
      common/src/main/java/com/alibaba/otter/canal/common/CanalLifeCycle.java
  4. 1 2
      common/src/main/java/com/alibaba/otter/canal/common/alarm/CanalAlarmHandler.java
  5. 8 5
      common/src/main/java/com/alibaba/otter/canal/common/zookeeper/ZkClientx.java
  6. 17 0
      common/src/main/java/com/google/common/collect/MigrateMap.java
  7. 67 37
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
  8. 3 3
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/InstanceAction.java
  9. 2 2
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/InstanceConfigMonitor.java
  10. 2 2
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/ManagerInstanceConfigMonitor.java
  11. 10 7
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/SpringInstanceConfigMonitor.java
  12. 35 29
      filter/src/main/java/com/alibaba/otter/canal/filter/PatternUtils.java
  13. 0 1
      instance/core/pom.xml
  14. 7 7
      instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstance.java
  15. 4 5
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java
  16. 2 2
      meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java
  17. 17 8
      meta/src/main/java/com/alibaba/otter/canal/meta/FileMixedMetaManager.java
  18. 9 4
      meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java
  19. 89 51
      meta/src/main/java/com/alibaba/otter/canal/meta/MixedMetaManager.java
  20. 19 10
      meta/src/main/java/com/alibaba/otter/canal/meta/PeriodMixedMetaManager.java
  21. 0 1
      parse/pom.xml
  22. 13 6
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
  23. 13 6
      parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java
  24. 9 4
      parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java
  25. 9 4
      parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java
  26. 339 338
      pom.xml
  27. 3 1
      protocol/src/main/java/com/alibaba/otter/canal/protocol/ClientIdentity.java
  28. 0 1
      server/pom.xml
  29. 2 2
      server/src/main/java/com/alibaba/otter/canal/server/CanalServer.java
  30. 29 0
      server/src/main/java/com/alibaba/otter/canal/server/CanalService.java
  31. 28 17
      server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java
  32. 12 12
      server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNetty.java
  33. 10 9
      server/src/main/java/com/alibaba/otter/canal/server/netty/handler/ClientAuthenticationHandler.java
  34. 18 20
      server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java
  35. 3 3
      server/src/test/java/com/alibaba/otter/canal/server/BaseCanalServerWithEmbededTest.java
  36. 1 1
      server/src/test/java/com/alibaba/otter/canal/server/CanalServerWithEmbedded_StandaloneTest.java
  37. 1 1
      server/src/test/java/com/alibaba/otter/canal/server/CanalServerWithEmbedded_StandbyTest.java
  38. 7 5
      server/src/test/java/com/alibaba/otter/canal/server/CanalServerWithNettyTest.java
  39. 3 5
      sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

+ 5 - 8
client/src/main/java/com/alibaba/otter/canal/client/CanalConnector.java

@@ -19,14 +19,14 @@ public interface CanalConnector {
      * 
      * @throws CanalClientException
      */
-    public void connect() throws CanalClientException;
+    void connect() throws CanalClientException;
 
     /**
      * 释放链接
      * 
      * @throws CanalClientException
      */
-    public void disconnect() throws CanalClientException;
+    void disconnect() throws CanalClientException;
 
     /**
      * 检查下链接是否合法
@@ -43,7 +43,7 @@ public interface CanalConnector {
      * 
      * @throws CanalClientException
      */
-    public boolean checkValid() throws CanalClientException;
+    boolean checkValid() throws CanalClientException;
 
     /**
      * 客户端订阅,重复订阅时会更新对应的filter信息
@@ -56,7 +56,6 @@ public interface CanalConnector {
      * TODO: 后续可以考虑,如果本次提交的filter不为空,在执行过滤时,是对canal server filter + 本次filter的交集处理,达到只取1份binlog数据,多个客户端消费不同的表
      * </pre>
      * 
-     * @param clientIdentity
      * @throws CanalClientException
      */
     void subscribe(String filter) throws CanalClientException;
@@ -64,7 +63,6 @@ public interface CanalConnector {
     /**
      * 客户端订阅,不提交客户端filter,以服务端的filter为准
      * 
-     * @param clientIdentity
      * @throws CanalClientException
      */
     void subscribe() throws CanalClientException;
@@ -72,7 +70,6 @@ public interface CanalConnector {
     /**
      * 取消订阅
      * 
-     * @param clientIdentity
      * @throws CanalClientException
      */
     void unsubscribe() throws CanalClientException;
@@ -140,14 +137,14 @@ public interface CanalConnector {
     void ack(long batchId) throws CanalClientException;
 
     /**
-     * 回滚到未进行 {@link ack} 的地方,指定回滚具体的batchId
+     * 回滚到未进行 {@link #ack} 的地方,指定回滚具体的batchId
      * 
      * @throws CanalClientException
      */
     void rollback(long batchId) throws CanalClientException;
 
     /**
-     * 回滚到未进行 {@link ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link ack} 的地方开始拿
+     * 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
      * 
      * @throws CanalClientException
      */

+ 0 - 1
common/pom.xml

@@ -6,7 +6,6 @@
 		<version>1.0.20-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
-	<groupId>com.alibaba.otter</groupId>
 	<artifactId>canal.common</artifactId>
 	<packaging>jar</packaging>
 	<name>canal common module for otter ${project.version}</name>

+ 3 - 3
common/src/main/java/com/alibaba/otter/canal/common/CanalLifeCycle.java

@@ -6,9 +6,9 @@ package com.alibaba.otter.canal.common;
  */
 public interface CanalLifeCycle {
 
-    public void start();
+    void start();
 
-    public void stop();
+    void stop();
 
-    public boolean isStart();
+    boolean isStart();
 }

+ 1 - 2
common/src/main/java/com/alibaba/otter/canal/common/alarm/CanalAlarmHandler.java

@@ -14,8 +14,7 @@ public interface CanalAlarmHandler extends CanalLifeCycle {
      * 发送对应destination的报警
      * 
      * @param destination
-     * @param title
      * @param msg
      */
-    public void sendAlarm(String destination, String msg);
+    void sendAlarm(String destination, String msg);
 }

+ 8 - 5
common/src/main/java/com/alibaba/otter/canal/common/zookeeper/ZkClientx.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.common.zookeeper;
 
+import com.google.common.collect.MigrateMap;
 import java.util.Map;
 
 import org.I0Itec.zkclient.IZkConnection;
@@ -23,12 +24,14 @@ import com.google.common.collect.MapMaker;
 public class ZkClientx extends ZkClient {
 
     // 对于zkclient进行一次缓存,避免一个jvm内部使用多个zk connection
-    private static Map<String, ZkClientx> clients = new MapMaker().makeComputingMap(new Function<String, ZkClientx>() {
+    private static Map<String, ZkClientx> clients = MigrateMap.makeComputingMap(new Function<String, ZkClientx>()
+    {
 
-                                                      public ZkClientx apply(String servers) {
-                                                          return new ZkClientx(servers);
-                                                      }
-                                                  });
+        public ZkClientx apply(String servers)
+        {
+            return new ZkClientx(servers);
+        }
+    });
 
     public static ZkClientx getZkClient(String servers) {
         return clients.get(servers);

+ 17 - 0
common/src/main/java/com/google/common/collect/MigrateMap.java

@@ -0,0 +1,17 @@
+package com.google.common.collect;
+
+import com.google.common.base.Function;
+import java.util.concurrent.ConcurrentMap;
+
+public class MigrateMap
+{
+    public static <K, V> ConcurrentMap<K, V> makeComputingMap(MapMaker maker, Function<? super K, ? extends V> computingFunction)
+    {
+        return maker.makeComputingMap(computingFunction);
+    }
+
+    public static <K, V> ConcurrentMap<K, V> makeComputingMap(Function<? super K, ? extends V> computingFunction)
+    {
+        return new MapMaker().makeComputingMap(computingFunction);
+    }
+}

+ 67 - 37
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.deployer;
 
+import com.google.common.collect.MigrateMap;
 import java.util.Map;
 import java.util.Properties;
 
@@ -33,7 +34,7 @@ import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
 import com.alibaba.otter.canal.instance.manager.CanalConfigClient;
 import com.alibaba.otter.canal.instance.manager.ManagerCanalInstanceGenerator;
 import com.alibaba.otter.canal.instance.spring.SpringCanalInstanceGenerator;
-import com.alibaba.otter.canal.server.embeded.CanalServerWithEmbeded;
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
 import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;
 import com.google.common.base.Function;
@@ -59,7 +60,7 @@ public class CanalController {
     private boolean                                  autoScan = true;
     private InstanceAction                           defaultAction;
     private Map<InstanceMode, InstanceConfigMonitor> instanceConfigMonitors;
-    private CanalServerWithEmbeded                   embededCanalServer;
+    private CanalServerWithEmbedded embededCanalServer;
     private CanalServerWithNetty                     canalServer;
 
     private CanalInstanceGenerator                   instanceGenerator;
@@ -70,9 +71,11 @@ public class CanalController {
     }
 
     public CanalController(final Properties properties){
-        managerClients = new MapMaker().makeComputingMap(new Function<String, CanalConfigClient>() {
+        managerClients = MigrateMap.makeComputingMap(new Function<String, CanalConfigClient>()
+        {
 
-            public CanalConfigClient apply(String managerAddress) {
+            public CanalConfigClient apply(String managerAddress)
+            {
                 return getManagerClient(managerAddress);
             }
         });
@@ -87,7 +90,7 @@ public class CanalController {
         cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
         ip = getProperty(properties, CanalConstants.CANAL_IP);
         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
-        embededCanalServer = new CanalServerWithEmbeded();
+        embededCanalServer = new CanalServerWithEmbedded();
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
         canalServer = new CanalServerWithNetty(embededCanalServer);
         canalServer.setIp(ip);
@@ -107,68 +110,88 @@ public class CanalController {
 
         final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
         ServerRunningMonitors.setServerData(serverData);
-        ServerRunningMonitors.setRunningMonitors(new MapMaker().makeComputingMap(new Function<String, ServerRunningMonitor>() {
-
-            public ServerRunningMonitor apply(final String destination) {
+        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>()
+        {
+            public ServerRunningMonitor apply(final String destination)
+            {
                 ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
                 runningMonitor.setDestination(destination);
-                runningMonitor.setListener(new ServerRunningListener() {
+                runningMonitor.setListener(new ServerRunningListener()
+                {
 
-                    public void processActiveEnter() {
-                        try {
+                    public void processActiveEnter()
+                    {
+                        try
+                        {
                             MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
                             embededCanalServer.start(destination);
-                        } finally {
+                        } finally
+                        {
                             MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
                     }
 
-                    public void processActiveExit() {
-                        try {
+                    public void processActiveExit()
+                    {
+                        try
+                        {
                             MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
                             embededCanalServer.stop(destination);
-                        } finally {
+                        } finally
+                        {
                             MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
                     }
 
-                    public void processStart() {
-                        try {
-                            if (zkclientx != null) {
+                    public void processStart()
+                    {
+                        try
+                        {
+                            if (zkclientx != null)
+                            {
                                 final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
-                                                                                                              + port);
+                                        + port);
                                 initCid(path);
-                                zkclientx.subscribeStateChanges(new IZkStateListener() {
+                                zkclientx.subscribeStateChanges(new IZkStateListener()
+                                {
 
-                                    public void handleStateChanged(KeeperState state) throws Exception {
+                                    public void handleStateChanged(KeeperState state) throws Exception
+                                    {
 
                                     }
 
-                                    public void handleNewSession() throws Exception {
+                                    public void handleNewSession() throws Exception
+                                    {
                                         initCid(path);
                                     }
                                 });
                             }
-                        } finally {
+                        } finally
+                        {
                             MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
                     }
 
-                    public void processStop() {
-                        try {
+                    public void processStop()
+                    {
+                        try
+                        {
                             MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                            if (zkclientx != null) {
+                            if (zkclientx != null)
+                            {
                                 final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
-                                                                                                              + port);
+                                        + port);
                                 releaseCid(path);
                             }
-                        } finally {
+                        } finally
+                        {
                             MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
                     }
 
                 });
-                if (zkclientx != null) {
+                if (zkclientx != null)
+                {
                     runningMonitor.setZkClient(zkclientx);
                 }
                 return runningMonitor;
@@ -216,25 +239,32 @@ public class CanalController {
                 }
             };
 
-            instanceConfigMonitors = new MapMaker().makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
+            instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>()
+            {
 
-                public InstanceConfigMonitor apply(InstanceMode mode) {
-                    int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
+                public InstanceConfigMonitor apply(InstanceMode mode)
+                {
+                    int scanInterval = Integer
+                            .valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
 
-                    if (mode.isSpring()) {
+                    if (mode.isSpring())
+                    {
                         SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
                         monitor.setScanIntervalInSecond(scanInterval);
                         monitor.setDefaultAction(defaultAction);
                         // 设置conf目录,默认是user.dir + conf目录组成
                         String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
-                        if (StringUtils.isEmpty(rootDir)) {
+                        if (StringUtils.isEmpty(rootDir))
+                        {
                             rootDir = "../conf";
                         }
                         monitor.setRootConf(rootDir);
                         return monitor;
-                    } else if (mode.isManager()) {
+                    } else if (mode.isManager())
+                    {
                         return new ManagerInstanceConfigMonitor();
-                    } else {
+                    } else
+                    {
                         throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
                     }
                 }
@@ -391,7 +421,7 @@ public class CanalController {
             }
 
             if (autoScan) {
-                instanceConfigMonitors.get(config.getMode()).regeister(destination, defaultAction);
+                instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
             }
         }
 

+ 3 - 3
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/InstanceAction.java

@@ -11,15 +11,15 @@ public interface InstanceAction {
     /**
      * 启动destination
      */
-    public void start(String destination);
+    void start(String destination);
 
     /**
      * 停止destination
      */
-    public void stop(String destination);
+    void stop(String destination);
 
     /**
      * 重载destination,可能需要stop,start操作,或者只是更新下内存配置
      */
-    public void reload(String destination);
+    void reload(String destination);
 }

+ 2 - 2
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/InstanceConfigMonitor.java

@@ -10,7 +10,7 @@ import com.alibaba.otter.canal.common.CanalLifeCycle;
  */
 public interface InstanceConfigMonitor extends CanalLifeCycle {
 
-    public void regeister(String destination, InstanceAction action);
+    void register(String destination, InstanceAction action);
 
-    public void unRegeister(String destination);
+    void unregister(String destination);
 }

+ 2 - 2
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/ManagerInstanceConfigMonitor.java

@@ -9,11 +9,11 @@ import com.alibaba.otter.canal.common.CanalLifeCycle;
  */
 public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle implements InstanceConfigMonitor, CanalLifeCycle {
 
-    public void regeister(String destination, InstanceAction action) {
+    public void register(String destination, InstanceAction action) {
 
     }
 
-    public void unRegeister(String destination) {
+    public void unregister(String destination) {
 
     }
 

+ 10 - 7
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/SpringInstanceConfigMonitor.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.deployer.monitor;
 
+import com.google.common.collect.MigrateMap;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FilenameFilter;
@@ -39,12 +40,14 @@ public class SpringInstanceConfigMonitor extends AbstractCanalLifeCycle implemen
     private long                             scanIntervalInSecond = 5;
     private InstanceAction                   defaultAction        = null;
     private Map<String, InstanceAction>      actions              = new MapMaker().makeMap();
-    private Map<String, InstanceConfigFiles> lastFiles            = new MapMaker().makeComputingMap(new Function<String, InstanceConfigFiles>() {
+    private Map<String, InstanceConfigFiles> lastFiles            = MigrateMap.makeComputingMap(new Function<String, InstanceConfigFiles>()
+    {
 
-                                                                      public InstanceConfigFiles apply(String destination) {
-                                                                          return new InstanceConfigFiles(destination);
-                                                                      }
-                                                                  });
+        public InstanceConfigFiles apply(String destination)
+        {
+            return new InstanceConfigFiles(destination);
+        }
+    });
     private ScheduledExecutorService         executor             = Executors.newScheduledThreadPool(1,
                                                                       new NamedThreadFactory("canal-instance-scan"));
 
@@ -72,7 +75,7 @@ public class SpringInstanceConfigMonitor extends AbstractCanalLifeCycle implemen
         lastFiles.clear();
     }
 
-    public void regeister(String destination, InstanceAction action) {
+    public void register(String destination, InstanceAction action) {
         if (action != null) {
             actions.put(destination, action);
         } else {
@@ -80,7 +83,7 @@ public class SpringInstanceConfigMonitor extends AbstractCanalLifeCycle implemen
         }
     }
 
-    public void unRegeister(String destination) {
+    public void unregister(String destination) {
         actions.remove(destination);
     }
 

+ 35 - 29
filter/src/main/java/com/alibaba/otter/canal/filter/PatternUtils.java

@@ -1,48 +1,54 @@
 package com.alibaba.otter.canal.filter;
 
+import com.alibaba.otter.canal.filter.exception.CanalFilterException;
+import com.google.common.base.Function;
+import com.google.common.collect.MapMaker;
+import com.google.common.collect.MigrateMap;
 import java.util.Map;
-
 import org.apache.oro.text.regex.MalformedPatternException;
 import org.apache.oro.text.regex.Pattern;
 import org.apache.oro.text.regex.PatternCompiler;
 import org.apache.oro.text.regex.Perl5Compiler;
 
-import com.alibaba.otter.canal.filter.exception.CanalFilterException;
-import com.google.common.base.Function;
-import com.google.common.collect.MapMaker;
-
 /**
  * 提供{@linkplain Pattern}的lazy get处理
- * 
+ *
  * @author jianghang 2013-1-22 下午09:36:44
  * @version 1.0.0
  */
-public class PatternUtils {
-
-    private static Map<String, Pattern> patterns = new MapMaker().softValues().makeComputingMap(
-                                                                                                new Function<String, Pattern>() {
-
-                                                                                                    public Pattern apply(
-                                                                                                                         String pattern) {
-                                                                                                        try {
-                                                                                                            PatternCompiler pc = new Perl5Compiler();
-                                                                                                            return pc.compile(
-                                                                                                                              pattern,
-                                                                                                                              Perl5Compiler.CASE_INSENSITIVE_MASK
-                                                                                                                                      | Perl5Compiler.READ_ONLY_MASK
-                                                                                                                                      | Perl5Compiler.SINGLELINE_MASK);
-                                                                                                        } catch (MalformedPatternException e) {
-                                                                                                            throw new CanalFilterException(
-                                                                                                                                           e);
-                                                                                                        }
-                                                                                                    }
-                                                                                                });
-
-    public static Pattern getPattern(String pattern) {
+public class PatternUtils
+{
+
+    private static Map<String, Pattern> patterns = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+            new Function<String, Pattern>()
+            {
+
+                public Pattern apply(
+                        String pattern)
+                {
+                    try
+                    {
+                        PatternCompiler pc = new Perl5Compiler();
+                        return pc.compile(
+                                pattern,
+                                Perl5Compiler.CASE_INSENSITIVE_MASK
+                                        | Perl5Compiler.READ_ONLY_MASK
+                                        | Perl5Compiler.SINGLELINE_MASK);
+                    } catch (MalformedPatternException e)
+                    {
+                        throw new CanalFilterException(
+                                e);
+                    }
+                }
+            });
+
+    public static Pattern getPattern(String pattern)
+    {
         return patterns.get(pattern);
     }
 
-    public static void clear() {
+    public static void clear()
+    {
         patterns.clear();
     }
 }

+ 0 - 1
instance/core/pom.xml

@@ -6,7 +6,6 @@
 		<version>1.0.20-SNAPSHOT</version>
 		<relativePath>../../pom.xml</relativePath>
 	</parent>
-	<groupId>com.alibaba.otter</groupId>
 	<artifactId>canal.instance.core</artifactId>
 	<packaging>jar</packaging>
 	<name>canal instance core module for otter ${project.version}</name>

+ 7 - 7
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstance.java

@@ -16,20 +16,20 @@ import com.alibaba.otter.canal.store.CanalEventStore;
  */
 public interface CanalInstance extends CanalLifeCycle {
 
-    public String getDestination();
+    String getDestination();
 
-    public CanalEventParser getEventParser();
+    CanalEventParser getEventParser();
 
-    public CanalEventSink getEventSink();
+    CanalEventSink getEventSink();
 
-    public CanalEventStore getEventStore();
+    CanalEventStore getEventStore();
 
-    public CanalMetaManager getMetaManager();
+    CanalMetaManager getMetaManager();
 
-    public CanalAlarmHandler getAlarmHandler();
+    CanalAlarmHandler getAlarmHandler();
 
     /**
      * 客户端发生订阅/取消订阅行为
      */
-    public boolean subscribeChange(ClientIdentity identity);
+    boolean subscribeChange(ClientIdentity identity);
 }

+ 4 - 5
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -89,8 +89,7 @@ public class CanalInstanceWithManager extends CanalInstanceSupport implements Ca
         this.destination = canal.getName();
         this.filter = filter;
 
-        logger.info("init CannalInstance for {}-{} with parameters:{}",
-            new Object[] { canalId, destination, parameters });
+        logger.info("init CannalInstance for {}-{} with parameters:{}", canalId, destination, parameters);
         // 初始化报警机制
         initAlarmHandler();
         // 初始化metaManager
@@ -116,8 +115,8 @@ public class CanalInstanceWithManager extends CanalInstanceSupport implements Ca
     public void start() {
         super.start();
         // 初始化metaManager
-        logger.info("start CannalInstance for {}-{} with parameters:{}", new Object[] { canalId, destination,
-                parameters });
+        logger.info("start CannalInstance for {}-{} with parameters:{}", canalId, destination,
+                parameters);
 
         if (!metaManager.isStart()) {
             metaManager.start();
@@ -336,7 +335,7 @@ public class CanalInstanceWithManager extends CanalInstanceSupport implements Ca
     }
 
     private CanalEventParser doInitEventParser(SourcingType type, List<InetSocketAddress> dbAddresses) {
-        CanalEventParser eventParser = null;
+        CanalEventParser eventParser;
         if (type.isMysql()) {
             MysqlEventParser mysqlEventParser = new MysqlEventParser();
             mysqlEventParser.setDestination(destination);

+ 2 - 2
meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java

@@ -35,12 +35,12 @@ public interface CanalMetaManager extends CanalLifeCycle {
     void unsubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException;
 
     /**
-     * 获取cuosr游标
+     * 获取 cursor 游标
      */
     Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException;
 
     /**
-     * 更新cuosr游标
+     * 更新 cursor 游标
      */
     void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException;
 

+ 17 - 8
meta/src/main/java/com/alibaba/otter/canal/meta/FileMixedMetaManager.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.meta;
 
+import com.google.common.collect.MigrateMap;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -69,28 +70,36 @@ public class FileMixedMetaManager extends MemoryMetaManager implements CanalMeta
             throw new CanalMetaManagerException("dir[" + dataDir.getPath() + "] can not read/write");
         }
 
-        dataFileCaches = new MapMaker().makeComputingMap(new Function<String, File>() {
+        dataFileCaches = MigrateMap.makeComputingMap(new Function<String, File>()
+        {
 
-            public File apply(String destination) {
+            public File apply(String destination)
+            {
                 return getDataFile(destination);
             }
         });
 
         executor = Executors.newScheduledThreadPool(1);
-        destinations = new MapMaker().makeComputingMap(new Function<String, List<ClientIdentity>>() {
+        destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>()
+        {
 
-            public List<ClientIdentity> apply(String destination) {
+            public List<ClientIdentity> apply(String destination)
+            {
                 return loadClientIdentity(destination);
             }
         });
 
-        cursors = new MapMaker().makeComputingMap(new Function<ClientIdentity, Position>() {
+        cursors = MigrateMap.makeComputingMap(new Function<ClientIdentity, Position>()
+        {
 
-            public Position apply(ClientIdentity clientIdentity) {
+            public Position apply(ClientIdentity clientIdentity)
+            {
                 Position position = loadCursor(clientIdentity.getDestination(), clientIdentity);
-                if (position == null) {
+                if (position == null)
+                {
                     return nullCursor; // 返回一个空对象标识,避免出现异常
-                } else {
+                } else
+                {
                     return position;
                 }
             }

+ 9 - 4
meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.meta;
 
+import com.google.common.collect.MigrateMap;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -31,9 +32,11 @@ public class MemoryMetaManager extends AbstractCanalLifeCycle implements CanalMe
     public void start() {
         super.start();
 
-        batches = new MapMaker().makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() {
+        batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>()
+        {
 
-            public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) {
+            public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity)
+            {
                 return MemoryClientIdentityBatch.create(clientIdentity);
             }
 
@@ -41,9 +44,11 @@ public class MemoryMetaManager extends AbstractCanalLifeCycle implements CanalMe
 
         cursors = new MapMaker().makeMap();
 
-        destinations = new MapMaker().makeComputingMap(new Function<String, List<ClientIdentity>>() {
+        destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>()
+        {
 
-            public List<ClientIdentity> apply(String destination) {
+            public List<ClientIdentity> apply(String destination)
+            {
                 return Lists.newArrayList();
             }
         });

+ 89 - 51
meta/src/main/java/com/alibaba/otter/canal/meta/MixedMetaManager.java

@@ -1,68 +1,79 @@
 package com.alibaba.otter.canal.meta;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.springframework.util.Assert;
-
 import com.alibaba.otter.canal.meta.exception.CanalMetaManagerException;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.position.Position;
 import com.alibaba.otter.canal.protocol.position.PositionRange;
 import com.google.common.base.Function;
-import com.google.common.collect.MapMaker;
+import com.google.common.collect.MigrateMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.springframework.util.Assert;
 
 /**
  * 组合memory + zookeeper的使用模式
- * 
+ *
  * @author jianghang 2012-7-11 下午03:58:00
  * @version 1.0.0
  */
 
-public class MixedMetaManager extends MemoryMetaManager implements CanalMetaManager {
+public class MixedMetaManager extends MemoryMetaManager implements CanalMetaManager
+{
 
-    private ExecutorService      executor;
+    private ExecutorService executor;
     private ZooKeeperMetaManager zooKeeperMetaManager;
     @SuppressWarnings("serial")
-    private final Position       nullCursor = new Position() {
-                                            };
+    private final Position nullCursor = new Position()
+    {
+    };
 
-    public void start() {
+    public void start()
+    {
         super.start();
         Assert.notNull(zooKeeperMetaManager);
-        if (!zooKeeperMetaManager.isStart()) {
+        if (!zooKeeperMetaManager.isStart())
+        {
             zooKeeperMetaManager.start();
         }
 
         executor = Executors.newFixedThreadPool(1);
-        destinations = new MapMaker().makeComputingMap(new Function<String, List<ClientIdentity>>() {
+        destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>()
+        {
 
-            public List<ClientIdentity> apply(String destination) {
+            public List<ClientIdentity> apply(String destination)
+            {
                 return zooKeeperMetaManager.listAllSubscribeInfo(destination);
             }
         });
 
-        cursors = new MapMaker().makeComputingMap(new Function<ClientIdentity, Position>() {
+        cursors = MigrateMap.makeComputingMap(new Function<ClientIdentity, Position>()
+        {
 
-            public Position apply(ClientIdentity clientIdentity) {
+            public Position apply(ClientIdentity clientIdentity)
+            {
                 Position position = zooKeeperMetaManager.getCursor(clientIdentity);
-                if (position == null) {
+                if (position == null)
+                {
                     return nullCursor; // 返回一个空对象标识,避免出现异常
-                } else {
+                } else
+                {
                     return position;
                 }
             }
         });
 
-        batches = new MapMaker().makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() {
+        batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>()
+        {
 
-            public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) {
+            public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity)
+            {
                 // 读取一下zookeeper信息,初始化一次
                 MemoryClientIdentityBatch batches = MemoryClientIdentityBatch.create(clientIdentity);
                 Map<Long, PositionRange> positionRanges = zooKeeperMetaManager.listAllBatchs(clientIdentity);
-                for (Map.Entry<Long, PositionRange> entry : positionRanges.entrySet()) {
+                for (Map.Entry<Long, PositionRange> entry : positionRanges.entrySet())
+                {
                     batches.addPositionRange(entry.getValue(), entry.getKey()); // 添加记录到指定batchId
                 }
                 return batches;
@@ -70,10 +81,12 @@ public class MixedMetaManager extends MemoryMetaManager implements CanalMetaMana
         });
     }
 
-    public void stop() {
+    public void stop()
+    {
         super.stop();
 
-        if (zooKeeperMetaManager.isStart()) {
+        if (zooKeeperMetaManager.isStart())
+        {
             zooKeeperMetaManager.stop();
         }
 
@@ -82,58 +95,73 @@ public class MixedMetaManager extends MemoryMetaManager implements CanalMetaMana
         batches.clear();
     }
 
-    public void subscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
+    public void subscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException
+    {
         super.subscribe(clientIdentity);
 
-        executor.submit(new Runnable() {
+        executor.submit(new Runnable()
+        {
 
-            public void run() {
+            public void run()
+            {
                 zooKeeperMetaManager.subscribe(clientIdentity);
             }
         });
     }
 
-    public void unsubscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
+    public void unsubscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException
+    {
         super.unsubscribe(clientIdentity);
 
-        executor.submit(new Runnable() {
+        executor.submit(new Runnable()
+        {
 
-            public void run() {
+            public void run()
+            {
                 zooKeeperMetaManager.unsubscribe(clientIdentity);
             }
         });
     }
 
     public void updateCursor(final ClientIdentity clientIdentity, final Position position)
-                                                                                          throws CanalMetaManagerException {
+            throws CanalMetaManagerException
+    {
         super.updateCursor(clientIdentity, position);
 
         // 异步刷新
-        executor.submit(new Runnable() {
+        executor.submit(new Runnable()
+        {
 
-            public void run() {
+            public void run()
+            {
                 zooKeeperMetaManager.updateCursor(clientIdentity, position);
             }
         });
     }
 
     @Override
-    public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
+    public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException
+    {
         Position position = super.getCursor(clientIdentity);
-        if (position == nullCursor) {
+        if (position == nullCursor)
+        {
             return null;
-        } else {
+        } else
+        {
             return position;
         }
     }
 
     public Long addBatch(final ClientIdentity clientIdentity, final PositionRange positionRange)
-                                                                                                throws CanalMetaManagerException {
+            throws CanalMetaManagerException
+    {
         final Long batchId = super.addBatch(clientIdentity, positionRange);
         // 异步刷新
-        executor.submit(new Runnable() {
+        executor.submit(new Runnable()
+        {
 
-            public void run() {
+            public void run()
+            {
                 zooKeeperMetaManager.addBatch(clientIdentity, positionRange, batchId);
             }
         });
@@ -141,24 +169,30 @@ public class MixedMetaManager extends MemoryMetaManager implements CanalMetaMana
     }
 
     public void addBatch(final ClientIdentity clientIdentity, final PositionRange positionRange, final Long batchId)
-                                                                                                                    throws CanalMetaManagerException {
+            throws CanalMetaManagerException
+    {
         super.addBatch(clientIdentity, positionRange, batchId);
         // 异步刷新
-        executor.submit(new Runnable() {
+        executor.submit(new Runnable()
+        {
 
-            public void run() {
+            public void run()
+            {
                 zooKeeperMetaManager.addBatch(clientIdentity, positionRange, batchId);
             }
         });
     }
 
     public PositionRange removeBatch(final ClientIdentity clientIdentity, final Long batchId)
-                                                                                             throws CanalMetaManagerException {
+            throws CanalMetaManagerException
+    {
         PositionRange positionRange = super.removeBatch(clientIdentity, batchId);
         // 异步刷新
-        executor.submit(new Runnable() {
+        executor.submit(new Runnable()
+        {
 
-            public void run() {
+            public void run()
+            {
                 zooKeeperMetaManager.removeBatch(clientIdentity, batchId);
             }
         });
@@ -166,20 +200,24 @@ public class MixedMetaManager extends MemoryMetaManager implements CanalMetaMana
         return positionRange;
     }
 
-    public void clearAllBatchs(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
+    public void clearAllBatchs(final ClientIdentity clientIdentity) throws CanalMetaManagerException
+    {
         super.clearAllBatchs(clientIdentity);
 
         // 异步刷新
-        executor.submit(new Runnable() {
+        executor.submit(new Runnable()
+        {
 
-            public void run() {
+            public void run()
+            {
                 zooKeeperMetaManager.clearAllBatchs(clientIdentity);
             }
         });
     }
 
     // =============== setter / getter ================
-    public void setZooKeeperMetaManager(ZooKeeperMetaManager zooKeeperMetaManager) {
+    public void setZooKeeperMetaManager(ZooKeeperMetaManager zooKeeperMetaManager)
+    {
         this.zooKeeperMetaManager = zooKeeperMetaManager;
     }
 }

+ 19 - 10
meta/src/main/java/com/alibaba/otter/canal/meta/PeriodMixedMetaManager.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.meta;
 
+import com.google.common.collect.MigrateMap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -52,32 +53,40 @@ public class PeriodMixedMetaManager extends MemoryMetaManager implements CanalMe
         }
 
         executor = Executors.newScheduledThreadPool(1);
-        destinations = new MapMaker().makeComputingMap(new Function<String, List<ClientIdentity>>() {
+        destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>()
+        {
 
-            public List<ClientIdentity> apply(String destination) {
+            public List<ClientIdentity> apply(String destination)
+            {
                 return zooKeeperMetaManager.listAllSubscribeInfo(destination);
             }
         });
 
-        cursors = new MapMaker().makeComputingMap(new Function<ClientIdentity, Position>() {
+        cursors = MigrateMap.makeComputingMap(new Function<ClientIdentity, Position>()
+        {
 
-            public Position apply(ClientIdentity clientIdentity) {
+            public Position apply(ClientIdentity clientIdentity)
+            {
                 Position position = zooKeeperMetaManager.getCursor(clientIdentity);
-                if (position == null) {
+                if (position == null)
+                {
                     return nullCursor; // 返回一个空对象标识,避免出现异常
-                } else {
+                } else
+                {
                     return position;
                 }
             }
         });
 
-        batches = new MapMaker().makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() {
-
-            public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) {
+        batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>()
+        {
+            public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity)
+            {
                 // 读取一下zookeeper信息,初始化一次
                 MemoryClientIdentityBatch batches = MemoryClientIdentityBatch.create(clientIdentity);
                 Map<Long, PositionRange> positionRanges = zooKeeperMetaManager.listAllBatchs(clientIdentity);
-                for (Map.Entry<Long, PositionRange> entry : positionRanges.entrySet()) {
+                for (Map.Entry<Long, PositionRange> entry : positionRanges.entrySet())
+                {
                     batches.addPositionRange(entry.getValue(), entry.getKey()); // 添加记录到指定batchId
                 }
                 return batches;

+ 0 - 1
parse/pom.xml

@@ -6,7 +6,6 @@
 		<version>1.0.20-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
-	<groupId>com.alibaba.otter</groupId>
 	<artifactId>canal.parse</artifactId>
 	<packaging>jar</packaging>
 	<name>canal parse module for otter ${project.version}</name>

+ 13 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.dbsync;
 
+import com.google.common.collect.MigrateMap;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,17 +39,23 @@ public class TableMetaCache {
 
     public TableMetaCache(MysqlConnection con){
         this.connection = con;
-        tableMetaCache = new MapMaker().makeComputingMap(new Function<String, TableMeta>() {
+        tableMetaCache = MigrateMap.makeComputingMap(new Function<String, TableMeta>()
+        {
 
-            public TableMeta apply(String name) {
-                try {
+            public TableMeta apply(String name)
+            {
+                try
+                {
                     return getTableMeta0(name);
-                } catch (IOException e) {
+                } catch (IOException e)
+                {
                     // 尝试做一次retry操作
-                    try {
+                    try
+                    {
                         connection.reconnect();
                         return getTableMeta0(name);
-                    } catch (IOException e1) {
+                    } catch (IOException e1)
+                    {
                         throw new CanalParseException("fetch failed by table meta:" + name, e1);
                     }
                 }

+ 13 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.parse.index;
 
+import com.google.common.collect.MigrateMap;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -67,21 +68,27 @@ public class FileMixedLogPositionManager extends MemoryLogPositionManager {
             throw new CanalMetaManagerException("dir[" + dataDir.getPath() + "] can not read/write");
         }
 
-        dataFileCaches = new MapMaker().makeComputingMap(new Function<String, File>() {
+        dataFileCaches = MigrateMap.makeComputingMap(new Function<String, File>()
+        {
 
-            public File apply(String destination) {
+            public File apply(String destination)
+            {
                 return getDataFile(destination);
             }
         });
 
         executor = Executors.newScheduledThreadPool(1);
-        positions = new MapMaker().makeComputingMap(new Function<String, LogPosition>() {
+        positions = MigrateMap.makeComputingMap(new Function<String, LogPosition>()
+        {
 
-            public LogPosition apply(String destination) {
+            public LogPosition apply(String destination)
+            {
                 LogPosition logPosition = loadDataFromFile(dataFileCaches.get(destination));
-                if (logPosition == null) {
+                if (logPosition == null)
+                {
                     return nullPosition;
-                } else {
+                } else
+                {
                     return logPosition;
                 }
             }

+ 9 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.parse.index;
 
+import com.google.common.collect.MigrateMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -34,13 +35,17 @@ public class MixedLogPositionManager extends MemoryLogPositionManager implements
             zooKeeperLogPositionManager.start();
         }
         executor = Executors.newFixedThreadPool(1);
-        positions = new MapMaker().makeComputingMap(new Function<String, LogPosition>() {
+        positions = MigrateMap.makeComputingMap(new Function<String, LogPosition>()
+        {
 
-            public LogPosition apply(String destination) {
+            public LogPosition apply(String destination)
+            {
                 LogPosition logPosition = zooKeeperLogPositionManager.getLatestIndexBy(destination);
-                if (logPosition == null) {
+                if (logPosition == null)
+                {
                     return nullPosition;
-                } else {
+                } else
+                {
                     return logPosition;
                 }
             }

+ 9 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.parse.index;
 
+import com.google.common.collect.MigrateMap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -43,13 +44,17 @@ public class PeriodMixedLogPositionManager extends MemoryLogPositionManager impl
             zooKeeperLogPositionManager.start();
         }
         executor = Executors.newScheduledThreadPool(1);
-        positions = new MapMaker().makeComputingMap(new Function<String, LogPosition>() {
+        positions = MigrateMap.makeComputingMap(new Function<String, LogPosition>()
+        {
 
-            public LogPosition apply(String destination) {
+            public LogPosition apply(String destination)
+            {
                 LogPosition logPosition = zooKeeperLogPositionManager.getLatestIndexBy(destination);
-                if (logPosition == null) {
+                if (logPosition == null)
+                {
                     return nullPosition;
-                } else {
+                } else
+                {
                     return logPosition;
                 }
             }

+ 339 - 338
pom.xml

@@ -1,311 +1,312 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<groupId>com.alibaba.otter</groupId>
-	<artifactId>canal</artifactId>
-	<packaging>pom</packaging>
-	<name>canal module for otter ${project.version}</name>
-	<version>1.0.20-SNAPSHOT</version>
-	<url>https://github.com/alibaba/canal</url>
-	<parent>
-	    <groupId>org.sonatype.oss</groupId>
-	    <artifactId>oss-parent</artifactId>
-	    <version>7</version>
-	</parent>
-	<developers>
-		<developer>
-			<name>agapple</name>
-			<url>http://agapple.iteye.com</url>
-			<email>jianghang115@gmail.com</email>
-			<timezone>8</timezone>
-		</developer>
-		<developer>
-			<name>zavakid</name>
-			<url>http://www.zavakid.com</url>
-			<email>zava.kid@gmail.com</email>
-			<timezone>8</timezone>
-		</developer>
-		<developer>
-			<name>in355hz</name>
-			<url>http://in355hz.iteye.com</url>
-			<email>in355hz@gmail.com</email>
-			<timezone>8</timezone>
-		</developer>
-	</developers>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>canal</artifactId>
+    <packaging>pom</packaging>
+    <name>canal module for otter ${project.version}</name>
+    <version>1.0.20-SNAPSHOT</version>
+    <url>https://github.com/alibaba/canal</url>
+    <parent>
+        <groupId>org.sonatype.oss</groupId>
+        <artifactId>oss-parent</artifactId>
+        <version>7</version>
+    </parent>
+    <developers>
+        <developer>
+            <name>agapple</name>
+            <url>http://agapple.iteye.com</url>
+            <email>jianghang115@gmail.com</email>
+            <timezone>8</timezone>
+        </developer>
+        <developer>
+            <name>zavakid</name>
+            <url>http://www.zavakid.com</url>
+            <email>zava.kid@gmail.com</email>
+            <timezone>8</timezone>
+        </developer>
+        <developer>
+            <name>in355hz</name>
+            <url>http://in355hz.iteye.com</url>
+            <email>in355hz@gmail.com</email>
+            <timezone>8</timezone>
+        </developer>
+    </developers>
 
-	<licenses>
-		<license>
-			<name>Apache License, Version 2.0</name>
-			<url>http://www.apache.org/licenses/LICENSE-2.0</url>
-		</license>
-	</licenses>
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0</url>
+        </license>
+    </licenses>
 
-	<scm>
-		<url>git@github.com:alibaba/canal.git</url>
-		<connection>scm:git:git@github.com:alibaba/canal.git</connection>
-		<developerConnection>scm:git:git@github.com:alibaba/canal.git</developerConnection>
-	</scm>
-	
-	<repositories>
-		<repository>
-			<id>central</id>
-			<url>http://repo1.maven.org/maven2</url>
-			<releases>
-				<enabled>true</enabled>
-			</releases>
-			<snapshots>
-				<enabled>false</enabled>
-			</snapshots>
-		</repository>
-		<repository>
-			<id>java.net</id>
-			<url>http://download.java.net/maven/2/</url>
-			<releases>
-				<enabled>true</enabled>
-			</releases>
-			<snapshots>
-				<enabled>false</enabled>
-			</snapshots>
-		</repository>
-		<repository>
-			<id>alibaba</id>
-			<url>http://code.alibabatech.com/mvn/releases/</url>
-			<releases>
-				<enabled>true</enabled>
-			</releases>
-			<snapshots>
-				<enabled>false</enabled>
-			</snapshots>
-		</repository>
-		<repository>
-			<id>sonatype</id>
-			<name>sonatype</name>
-			<url>https://oss.sonatype.org/content/repositories/snapshots</url>
-			<releases>
-				<enabled>false</enabled>
-			</releases>
-			<snapshots>
-				<enabled>true</enabled>
-			</snapshots>
-		</repository>
-		<repository>
-			<id>sonatype-release</id>
-			<name>sonatype-release</name>
-			<url>https://oss.sonatype.org/service/local/repositories/releases/content</url>
-			<releases>
-				<enabled>false</enabled>
-			</releases>
-			<snapshots>
-				<enabled>true</enabled>
-			</snapshots>
-		</repository>
-	</repositories>
-	
-	<properties>
-		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-		<!--maven properties-->
-		<maven.test.skip>true</maven.test.skip>
-		<downloadSources>true</downloadSources>
-		<!-- compiler settings properties -->
-		<java_source_version>1.6</java_source_version>
-		<java_target_version>1.6</java_target_version>
-		<file_encoding>UTF-8</file_encoding>
-	</properties>
-	
-	<modules>
-		<module>common</module>
-		<module>meta</module>
-		<module>dbsync</module>
-		<module>filter</module>	
-		<module>driver</module>
-		<module>parse</module>
-		<module>sink</module>
-		<module>store</module>
-		<module>protocol</module>
-		<module>instance</module>
-		<module>server</module>
-		<module>client</module>
-		<module>deployer</module>
-		<module>example</module>
-	</modules>
-	
-	<dependencyManagement>
-		<dependencies>
-			<dependency>
-				<groupId>org.springframework</groupId>
-				<artifactId>spring</artifactId>
-				<version>2.5.6</version>
-			</dependency>
+    <scm>
+        <url>git@github.com:alibaba/canal.git</url>
+        <connection>scm:git:git@github.com:alibaba/canal.git</connection>
+        <developerConnection>scm:git:git@github.com:alibaba/canal.git</developerConnection>
+    </scm>
+
+    <repositories>
+        <repository>
+            <id>central</id>
+            <url>http://repo1.maven.org/maven2</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+        <repository>
+            <id>java.net</id>
+            <url>http://download.java.net/maven/2/</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+        <repository>
+            <id>alibaba</id>
+            <url>http://code.alibabatech.com/mvn/releases/</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+        <repository>
+            <id>sonatype</id>
+            <name>sonatype</name>
+            <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+        <repository>
+            <id>sonatype-release</id>
+            <name>sonatype-release</name>
+            <url>https://oss.sonatype.org/service/local/repositories/releases/content</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <!--maven properties-->
+        <maven.test.skip>true</maven.test.skip>
+        <downloadSources>true</downloadSources>
+        <!-- compiler settings properties -->
+        <java_source_version>1.6</java_source_version>
+        <java_target_version>1.6</java_target_version>
+        <file_encoding>UTF-8</file_encoding>
+    </properties>
+
+    <modules>
+        <module>common</module>
+        <module>meta</module>
+        <module>dbsync</module>
+        <module>filter</module>
+        <module>driver</module>
+        <module>parse</module>
+        <module>sink</module>
+        <module>store</module>
+        <module>protocol</module>
+        <module>instance</module>
+        <module>server</module>
+        <module>client</module>
+        <module>deployer</module>
+        <module>example</module>
+    </modules>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.springframework</groupId>
+                <artifactId>spring</artifactId>
+                <version>2.5.6</version>
+            </dependency>
             <!-- external -->
             <dependency>
-				<groupId>commons-lang</groupId>
-				<artifactId>commons-lang</artifactId>
-				<version>2.6</version>
-			</dependency>
-			<dependency>
-				<groupId>commons-io</groupId>
-				<artifactId>commons-io</artifactId>
-				<version>2.4</version>
-			</dependency>
-			<dependency>
-				<groupId>org.apache.zookeeper</groupId>
-				<artifactId>zookeeper</artifactId>
-				<version>3.4.5</version>
-				<exclusions>
-				  <exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				  </exclusion>
-				  <exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				  </exclusion>
-				  <exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-api</artifactId>
-				  </exclusion>
-				  <exclusion>
-					<groupId>jline</groupId>
-					<artifactId>jline</artifactId>
-				  </exclusion>
-				</exclusions>
-			</dependency>
-			<dependency>
-				<groupId>com.github.sgroschupf</groupId>
-				<artifactId>zkclient</artifactId>
-				<version>0.1</version>
-			</dependency>
-			<dependency>
-				<groupId>com.alibaba</groupId>
-				<artifactId>fastjson</artifactId>
-				<version>1.1.26</version>
-			</dependency>
-			<dependency>
-				<groupId>com.google.guava</groupId>
-				<artifactId>guava</artifactId>
-				<version>r09</version>
-			</dependency>
-			<dependency>
-				<groupId>com.googlecode.aviator</groupId>
-				<artifactId>aviator</artifactId>
-				<version>2.2.1</version>
-			</dependency>
-			<dependency>
-				<groupId>oro</groupId>
-				<artifactId>oro</artifactId>
-				<version>2.0.8</version>
-			</dependency>
-			<dependency>
-			  <groupId>org.jboss.netty</groupId>
-			  <artifactId>netty</artifactId>
-			  <version>3.2.5.Final</version>
-			</dependency>
-			<dependency>
-				<groupId>com.google.protobuf</groupId>
-				<artifactId>protobuf-java</artifactId>
-				<version>2.4.1</version>
-			</dependency>
+                <groupId>commons-lang</groupId>
+                <artifactId>commons-lang</artifactId>
+                <version>2.6</version>
+            </dependency>
+            <dependency>
+                <groupId>commons-io</groupId>
+                <artifactId>commons-io</artifactId>
+                <version>2.4</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.zookeeper</groupId>
+                <artifactId>zookeeper</artifactId>
+                <version>3.4.5</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-api</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>jline</groupId>
+                        <artifactId>jline</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>com.github.sgroschupf</groupId>
+                <artifactId>zkclient</artifactId>
+                <version>0.1</version>
+            </dependency>
+            <dependency>
+                <groupId>com.alibaba</groupId>
+                <artifactId>fastjson</artifactId>
+                <version>1.1.26</version>
+            </dependency>
+            <dependency>
+                <groupId>com.google.guava</groupId>
+                <artifactId>guava</artifactId>
+                <version>18.0</version>
+            </dependency>
+            <dependency>
+                <groupId>com.googlecode.aviator</groupId>
+                <artifactId>aviator</artifactId>
+                <version>2.2.1</version>
+            </dependency>
+            <dependency>
+                <groupId>oro</groupId>
+                <artifactId>oro</artifactId>
+                <version>2.0.8</version>
+            </dependency>
+            <dependency>
+                <groupId>org.jboss.netty</groupId>
+                <artifactId>netty</artifactId>
+                <version>3.2.5.Final</version>
+            </dependency>
+            <dependency>
+                <groupId>com.google.protobuf</groupId>
+                <artifactId>protobuf-java</artifactId>
+                <version>2.4.1</version>
+            </dependency>
             <!-- log -->
             <dependency>
-				<groupId>ch.qos.logback</groupId>
-				<artifactId>logback-core</artifactId>
-				<version>1.0.6</version>
-			</dependency>
-			<dependency>
-				<groupId>ch.qos.logback</groupId>
-				<artifactId>logback-classic</artifactId>
-				<version>1.0.6</version>
-			</dependency>
-			<dependency>
-				<groupId>org.slf4j</groupId>
-				<artifactId>jcl-over-slf4j</artifactId>
-				<version>1.6.0</version>
-			</dependency>
-			<dependency>
-				<groupId>org.slf4j</groupId>
-				<artifactId>slf4j-api</artifactId>
-				<version>1.6.0</version>
-			</dependency>
-			<!-- test dependency -->
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-core</artifactId>
+                <version>1.1.3</version>
+            </dependency>
             <dependency>
-				<groupId>junit</groupId>
-				<artifactId>junit</artifactId>
-				<version>4.5</version>
-				<scope>test</scope>
-			</dependency>
-			<dependency>
-				<groupId>mysql</groupId>
-				<artifactId>mysql-connector-java</artifactId>
-				<version>5.1.12</version>
-				<scope>test</scope>
-			</dependency>
-		</dependencies>
-	</dependencyManagement>
-	
-	<build>
-		<extensions>
-			<extension>
-				<groupId>org.jvnet.wagon-svn</groupId>
-				<artifactId>wagon-svn</artifactId>
-				<version>1.9</version>
-			</extension>
-			<extension>
-				<groupId>org.apache.maven.wagon</groupId>
-				<artifactId>wagon-http-shared</artifactId>
-				<version>1.0-beta-7</version>
-			</extension>
-		</extensions>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-source-plugin</artifactId>
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-classic</artifactId>
+                <version>1.1.3</version>
+            </dependency>
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>jcl-over-slf4j</artifactId>
+                <version>1.7.12</version>
+            </dependency>
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-api</artifactId>
+                <version>1.7.12</version>
+            </dependency>
+            <!-- test dependency -->
+            <dependency>
+                <groupId>junit</groupId>
+                <artifactId>junit</artifactId>
+                <version>4.12</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>mysql</groupId>
+                <artifactId>mysql-connector-java</artifactId>
+                <version>5.1.12</version>
+                <scope>test</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <build>
+        <extensions>
+            <extension>
+                <groupId>org.jvnet.wagon-svn</groupId>
+                <artifactId>wagon-svn</artifactId>
+                <version>1.9</version>
+            </extension>
+            <extension>
+                <groupId>org.apache.maven.wagon</groupId>
+                <artifactId>wagon-http-shared</artifactId>
+                <version>1.0-beta-7</version>
+            </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-source-plugin</artifactId>
                 <version>2.4</version>
-				<executions>
-					<execution>
-						<id>attach-sources</id>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
                 <version>3.2</version>
-				<configuration>
-					<source>${java_source_version}</source>
-					<target>${java_target_version}</target>
-					<encoding>${file_encoding}</encoding>
-				</configuration>
-			</plugin>
-			<plugin>
-			   <groupId>org.apache.maven.plugins</groupId>  
-			   <artifactId>maven-eclipse-plugin</artifactId>  
-			   <version>2.5.1</version>  
-			   <configuration>  
-				  <additionalConfig>  
-					 <file>  
-						<name>.settings/org.eclipse.core.resources.prefs</name>  
-						<content>  
-						   <![CDATA[eclipse.preferences.version=1${line.separator}encoding/<project>=${file_encoding}${line.separator}]]>  
-						</content>  
-					 </file>  
-				  </additionalConfig>  
-			   </configuration>  
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<version>2.5</version>
-				<configuration>
-				  <includes>
-					<include>**/*Test.java</include>
-				  </includes>
-				  <excludes>
-					<exclude>**/*NoRunTest.java</exclude>
-				  </excludes>
-				</configuration>
-			</plugin>
+                <configuration>
+                    <source>${java_source_version}</source>
+                    <target>${java_target_version}</target>
+                    <encoding>${file_encoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-eclipse-plugin</artifactId>
+                <version>2.5.1</version>
+                <configuration>
+                    <additionalConfig>
+                        <file>
+                            <name>.settings/org.eclipse.core.resources.prefs</name>
+                            <content>
+                                <![CDATA[eclipse.preferences.version=1${line.separator}encoding/<project>=${file_encoding}${line.separator}]]>
+                            </content>
+                        </file>
+                    </additionalConfig>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.5</version>
+                <configuration>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                    <excludes>
+                        <exclude>**/*NoRunTest.java</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
             <!-- javadoc -->
             <!--
             <plugin>
@@ -362,43 +363,43 @@
 				</executions>
 			</plugin>
 			-->
-		</plugins>
-		<sourceDirectory>src/main/java</sourceDirectory>  
-		<testSourceDirectory>src/test/java</testSourceDirectory>
-		<resources>
-			<resource>
-				<directory>src/main/resources</directory>
-				<includes>
-					<include>**/*</include>
-				</includes>
-				<excludes>
-					<exclude>**/.svn/</exclude>
-				</excludes>
-			</resource>
-		</resources>
-		<testResources>
-			<testResource>
-				<directory>src/test/resources</directory>
-				<includes>
-					<include>**/*</include>
-				</includes>
-				<excludes>
-					<exclude>**/.svn/</exclude>
-				</excludes>
-			</testResource>
-		</testResources>
-	</build>
-	
-	<distributionManagement>
-		<snapshotRepository>
-			<id>sonatype-nexus-snapshots</id>
-			<name>Sonatype Nexus Snapshots</name>
-			<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
-		</snapshotRepository>
-		<repository>
-			<id>sonatype-nexus-staging</id>
-			<name>Nexus Release Repository</name>
-			<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
-		</repository>
-	</distributionManagement>
+        </plugins>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <includes>
+                    <include>**/*</include>
+                </includes>
+                <excludes>
+                    <exclude>**/.svn/</exclude>
+                </excludes>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+                <includes>
+                    <include>**/*</include>
+                </includes>
+                <excludes>
+                    <exclude>**/.svn/</exclude>
+                </excludes>
+            </testResource>
+        </testResources>
+    </build>
+
+    <distributionManagement>
+        <snapshotRepository>
+            <id>sonatype-nexus-snapshots</id>
+            <name>Sonatype Nexus Snapshots</name>
+            <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
+        </snapshotRepository>
+        <repository>
+            <id>sonatype-nexus-staging</id>
+            <name>Nexus Release Repository</name>
+            <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
+        </repository>
+    </distributionManagement>
 </project>

+ 3 - 1
protocol/src/main/java/com/alibaba/otter/canal/protocol/ClientIdentity.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.protocol;
 
+import java.io.Serializable;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.ToStringBuilder;
 
@@ -9,7 +10,8 @@ import com.alibaba.otter.canal.common.utils.CanalToStringStyle;
  * @author zebin.xuzb @ 2012-6-20
  * @version 1.0.0
  */
-public class ClientIdentity {
+public class ClientIdentity implements Serializable
+{
 
     private String destination;
     private short  clientId;

+ 0 - 1
server/pom.xml

@@ -6,7 +6,6 @@
 		<version>1.0.20-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
-	<groupId>com.alibaba.otter</groupId>
 	<artifactId>canal.server</artifactId>
 	<packaging>jar</packaging>
 	<name>canal server module for otter ${project.version}</name>

+ 2 - 2
server/src/main/java/com/alibaba/otter/canal/server/CanalServer.java

@@ -11,7 +11,7 @@ import com.alibaba.otter.canal.server.exception.CanalServerException;
  */
 public interface CanalServer extends CanalLifeCycle {
 
-    public void start() throws CanalServerException;
+    void start() throws CanalServerException;
 
-    public void stop() throws CanalServerException;
+    void stop() throws CanalServerException;
 }

+ 29 - 0
server/src/main/java/com/alibaba/otter/canal/server/CanalService.java

@@ -0,0 +1,29 @@
+package com.alibaba.otter.canal.server;
+
+import com.alibaba.otter.canal.protocol.ClientIdentity;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.server.exception.CanalServerException;
+import java.util.concurrent.TimeUnit;
+
+public interface CanalService
+{
+    void subscribe(ClientIdentity clientIdentity) throws CanalServerException;
+
+    void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException;
+
+    Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;
+
+    Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
+            throws CanalServerException;
+
+    Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;
+
+    Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
+            throws CanalServerException;
+
+    void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException;
+
+    void rollback(ClientIdentity clientIdentity) throws CanalServerException;
+
+    void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException;
+}

+ 28 - 17
server/src/main/java/com/alibaba/otter/canal/server/embeded/CanalServerWithEmbeded.java → server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -1,5 +1,6 @@
-package com.alibaba.otter.canal.server.embeded;
+package com.alibaba.otter.canal.server.embedded;
 
+import com.google.common.collect.MigrateMap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -27,7 +28,6 @@ import com.alibaba.otter.canal.store.model.Event;
 import com.alibaba.otter.canal.store.model.Events;
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
-import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
 
 /**
@@ -37,9 +37,10 @@ import com.google.common.collect.Maps;
  * @author zebin.xuzb
  * @version 1.0.0
  */
-public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements CanalServer {
+public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, com.alibaba.otter.canal.server.CanalService
+{
 
-    private static final Logger        logger = LoggerFactory.getLogger(CanalServerWithEmbeded.class);
+    private static final Logger        logger = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
     private Map<String, CanalInstance> canalInstances;
     // private Map<ClientIdentity, Position> lastRollbackPostions;
     private CanalInstanceGenerator     canalInstanceGenerator;
@@ -47,11 +48,12 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
     public void start() {
         super.start();
 
-        canalInstances = new MapMaker().makeComputingMap(new Function<String, CanalInstance>() {
+        canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>()
+        {
 
-            public CanalInstance apply(String destination) {
-                CanalInstance canalInstance = canalInstanceGenerator.generate(destination);
-                return canalInstance;
+            public CanalInstance apply(String destination)
+            {
+                return canalInstanceGenerator.generate(destination);
             }
         });
 
@@ -74,7 +76,7 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
                     }
                 }
             } catch (Exception e) {
-                logger.error(String.format("stop cannalInstance[%s] has an error", entry.getKey()), e);
+                logger.error(String.format("stop CanalInstance[%s] has an error", entry.getKey()), e);
             }
         }
     }
@@ -114,6 +116,7 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
     /**
      * 客户端订阅,重复订阅时会更新对应的filter信息
      */
+    @Override
     public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
         CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
         if (!canalInstance.getMetaManager().isStart()) {
@@ -140,6 +143,7 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
     /**
      * 取消订阅
      */
+    @Override
     public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
         CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
         canalInstance.getMetaManager().unsubscribe(clientIdentity); // 执行一下meta订阅
@@ -162,6 +166,7 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
      * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
      * </pre>
      */
+    @Override
     public Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
         return get(clientIdentity, batchSize, null, null);
     }
@@ -179,6 +184,7 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
      * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
      * </pre>
      */
+    @Override
     public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
                                                                                                  throws CanalServerException {
         checkStart(clientIdentity.getDestination());
@@ -213,8 +219,8 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
                 });
 
                 logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
-                            new Object[] { clientIdentity.getClientId(), batchSize, entrys.size(), batchId,
-                                    events.getPositionRange() });
+                        clientIdentity.getClientId(), batchSize, entrys.size(), batchId,
+                        events.getPositionRange());
                 // 直接提交ack
                 ack(clientIdentity, batchId);
                 return new Message(batchId, entrys);
@@ -230,6 +236,7 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
      * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
      * </pre>
      */
+    @Override
     public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
         return getWithoutAck(clientIdentity, batchSize, null, null);
     }
@@ -248,6 +255,7 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
      * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
      * </pre>
      */
+    @Override
     public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
                                                                                                            throws CanalServerException {
         checkStart(clientIdentity.getDestination());
@@ -285,8 +293,8 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
                 });
 
                 logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",
-                            new Object[] { clientIdentity.getClientId(), batchSize, entrys.size(), batchId,
-                                    events.getPositionRange() });
+                        clientIdentity.getClientId(), batchSize, entrys.size(), batchId,
+                        events.getPositionRange());
                 return new Message(batchId, entrys);
             }
 
@@ -314,6 +322,7 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
      * 注意:进行反馈时必须按照batchId的顺序进行ack(需有客户端保证)
      * </pre>
      */
+    @Override
     public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
         checkStart(clientIdentity.getDestination());
         checkSubscribe(clientIdentity);
@@ -348,7 +357,7 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
         if (positionRanges.getAck() != null) {
             canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
             logger.info("ack successfully, clientId:{} batchId:{} position:{}",
-                        new Object[] { clientIdentity.getClientId(), batchId, positionRanges });
+                    clientIdentity.getClientId(), batchId, positionRanges);
         }
 
         // 可定时清理数据
@@ -357,8 +366,9 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
     }
 
     /**
-     * 回滚到未进行 {@link ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link ack} 的地方开始拿
+     * 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
      */
+    @Override
     public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
         checkStart(clientIdentity.getDestination());
         CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
@@ -378,8 +388,9 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
     }
 
     /**
-     * 回滚到未进行 {@link ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link ack} 的地方开始拿
+     * 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
      */
+    @Override
     public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
         checkStart(clientIdentity.getDestination());
         CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
@@ -405,7 +416,7 @@ public class CanalServerWithEmbeded extends AbstractCanalLifeCycle implements Ca
             canalInstance.getEventStore().rollback();// rollback
                                                      // eventStore中的状态信息
             logger.info("rollback successfully, clientId:{} batchId:{} position:{}",
-                        new Object[] { clientIdentity.getClientId(), batchId, positionRanges });
+                    clientIdentity.getClientId(), batchId, positionRanges);
         }
     }
 

+ 12 - 12
server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNetty.java

@@ -13,7 +13,7 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.server.CanalServer;
-import com.alibaba.otter.canal.server.embeded.CanalServerWithEmbeded;
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 import com.alibaba.otter.canal.server.netty.handler.ClientAuthenticationHandler;
 import com.alibaba.otter.canal.server.netty.handler.FixedHeaderFrameDecoder;
 import com.alibaba.otter.canal.server.netty.handler.HandshakeInitializationHandler;
@@ -27,7 +27,7 @@ import com.alibaba.otter.canal.server.netty.handler.SessionHandler;
  */
 public class CanalServerWithNetty extends AbstractCanalLifeCycle implements CanalServer {
 
-    private CanalServerWithEmbeded embededServer;       // 嵌入式server
+    private CanalServerWithEmbedded embeddedServer;       // 嵌入式server
     private String                 ip;
     private int                    port;
     private Channel                serverChannel = null;
@@ -36,15 +36,15 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
     public CanalServerWithNetty(){
     }
 
-    public CanalServerWithNetty(CanalServerWithEmbeded embededServer){
-        this.embededServer = embededServer;
+    public CanalServerWithNetty(CanalServerWithEmbedded embeddedServer){
+        this.embeddedServer = embeddedServer;
     }
 
     public void start() {
         super.start();
 
-        if (!embededServer.isStart()) {
-            embededServer.start();
+        if (!embeddedServer.isStart()) {
+            embeddedServer.start();
         }
 
         this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
@@ -58,9 +58,9 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
                 pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
                 pipelines.addLast(HandshakeInitializationHandler.class.getName(), new HandshakeInitializationHandler());
                 pipelines.addLast(ClientAuthenticationHandler.class.getName(),
-                                  new ClientAuthenticationHandler(embededServer));
+                                  new ClientAuthenticationHandler(embeddedServer));
 
-                SessionHandler sessionHandler = new SessionHandler(embededServer);
+                SessionHandler sessionHandler = new SessionHandler(embeddedServer);
                 pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
                 return pipelines;
             }
@@ -85,8 +85,8 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
             this.bootstrap.releaseExternalResources();
         }
 
-        if (embededServer.isStart()) {
-            embededServer.stop();
+        if (embeddedServer.isStart()) {
+            embeddedServer.stop();
         }
     }
 
@@ -98,8 +98,8 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
         this.port = port;
     }
 
-    public void setEmbededServer(CanalServerWithEmbeded embededServer) {
-        this.embededServer = embededServer;
+    public void setEmbeddedServer(CanalServerWithEmbedded embeddedServer) {
+        this.embeddedServer = embeddedServer;
     }
 
 }

+ 10 - 9
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/ClientAuthenticationHandler.java

@@ -19,7 +19,7 @@ import com.alibaba.otter.canal.common.zookeeper.running.ServerRunningMonitors;
 import com.alibaba.otter.canal.protocol.CanalPacket.ClientAuth;
 import com.alibaba.otter.canal.protocol.CanalPacket.Packet;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
-import com.alibaba.otter.canal.server.embeded.CanalServerWithEmbeded;
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 import com.alibaba.otter.canal.server.netty.NettyUtils;
 
 /**
@@ -33,14 +33,14 @@ public class ClientAuthenticationHandler extends SimpleChannelHandler {
     private static final Logger    logger                                  = LoggerFactory.getLogger(ClientAuthenticationHandler.class);
     private final int              SUPPORTED_VERSION                       = 3;
     private final int              defaultSubscriptorDisconnectIdleTimeout = 5 * 60 * 1000;
-    private CanalServerWithEmbeded embededServer;
+    private CanalServerWithEmbedded embeddedServer;
 
     public ClientAuthenticationHandler(){
 
     }
 
-    public ClientAuthenticationHandler(CanalServerWithEmbeded embededServer){
-        this.embededServer = embededServer;
+    public ClientAuthenticationHandler(CanalServerWithEmbedded embeddedServer){
+        this.embeddedServer = embeddedServer;
     }
 
     public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception {
@@ -58,11 +58,12 @@ public class ClientAuthenticationHandler extends SimpleChannelHandler {
                         clientAuth.getFilter());
                     try {
                         MDC.put("destination", clientIdentity.getDestination());
-                        embededServer.subscribe(clientIdentity);
+                        embeddedServer.subscribe(clientIdentity);
                         ctx.setAttachment(clientIdentity);// 设置状态数据
                         // 尝试启动,如果已经启动,忽略
-                        if (!embededServer.isStart(clientIdentity.getDestination())) {
-                            ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
+                        if (!embeddedServer.isStart(clientIdentity.getDestination())) {
+                            ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity
+                                    .getDestination());
                             if (!runningMonitor.isStart()) {
                                 runningMonitor.start();
                             }
@@ -114,8 +115,8 @@ public class ClientAuthenticationHandler extends SimpleChannelHandler {
         }
     }
 
-    public void setEmbededServer(CanalServerWithEmbeded embededServer) {
-        this.embededServer = embededServer;
+    public void setEmbeddedServer(CanalServerWithEmbedded embeddedServer) {
+        this.embeddedServer = embeddedServer;
     }
 
 }

+ 18 - 20
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -31,7 +31,7 @@ import com.alibaba.otter.canal.protocol.CanalPacket.Sub;
 import com.alibaba.otter.canal.protocol.CanalPacket.Unsub;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.server.embeded.CanalServerWithEmbeded;
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 import com.alibaba.otter.canal.server.netty.NettyUtils;
 
 /**
@@ -43,14 +43,12 @@ import com.alibaba.otter.canal.server.netty.NettyUtils;
 public class SessionHandler extends SimpleChannelHandler {
 
     private static final Logger    logger = LoggerFactory.getLogger(SessionHandler.class);
-    private CanalServerWithEmbeded embededServer;
+    private CanalServerWithEmbedded embeddedServer;
 
-    public SessionHandler(){
+    public SessionHandler(){ }
 
-    }
-
-    public SessionHandler(CanalServerWithEmbeded embededServer){
-        this.embededServer = embededServer;
+    public SessionHandler(CanalServerWithEmbedded embeddedServer){
+        this.embeddedServer = embeddedServer;
     }
 
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
@@ -66,10 +64,10 @@ public class SessionHandler extends SimpleChannelHandler {
                         clientIdentity = new ClientIdentity(sub.getDestination(), Short.valueOf(sub.getClientId()),
                                                             sub.getFilter());
                         MDC.put("destination", clientIdentity.getDestination());
-                        embededServer.subscribe(clientIdentity);
+                        embeddedServer.subscribe(clientIdentity);
 
                         // 尝试启动,如果已经启动,忽略
-                        if (!embededServer.isStart(clientIdentity.getDestination())) {
+                        if (!embeddedServer.isStart(clientIdentity.getDestination())) {
                             ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
                             if (!runningMonitor.isStart()) {
                                 runningMonitor.start();
@@ -90,7 +88,7 @@ public class SessionHandler extends SimpleChannelHandler {
                         clientIdentity = new ClientIdentity(unsub.getDestination(), Short.valueOf(unsub.getClientId()),
                                                             unsub.getFilter());
                         MDC.put("destination", clientIdentity.getDestination());
-                        embededServer.unsubscribe(clientIdentity);
+                        embeddedServer.unsubscribe(clientIdentity);
                         stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭
                         NettyUtils.ack(ctx.getChannel(), null);
                     } else {
@@ -108,17 +106,17 @@ public class SessionHandler extends SimpleChannelHandler {
 
                         //                        if (get.getAutoAck()) {
                         //                            if (get.getTimeout() == -1) {//是否是初始值
-                        //                                message = embededServer.get(clientIdentity, get.getFetchSize());
+                        //                                message = embeddedServer.get(clientIdentity, get.getFetchSize());
                         //                            } else {
                         //                                TimeUnit unit = convertTimeUnit(get.getUnit());
-                        //                                message = embededServer.get(clientIdentity, get.getFetchSize(), get.getTimeout(), unit);
+                        //                                message = embeddedServer.get(clientIdentity, get.getFetchSize(), get.getTimeout(), unit);
                         //                            }
                         //                        } else {
                         if (get.getTimeout() == -1) {//是否是初始值
-                            message = embededServer.getWithoutAck(clientIdentity, get.getFetchSize());
+                            message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
                         } else {
                             TimeUnit unit = convertTimeUnit(get.getUnit());
-                            message = embededServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(),
+                            message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(),
                                                                   unit);
                         }
                         //                        }
@@ -153,7 +151,7 @@ public class SessionHandler extends SimpleChannelHandler {
                             // donothing
                         } else {
                             clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()));
-                            embededServer.ack(clientIdentity, ack.getBatchId());
+                            embeddedServer.ack(clientIdentity, ack.getBatchId());
                         }
                     } else {
                         NettyUtils.error(401,
@@ -169,9 +167,9 @@ public class SessionHandler extends SimpleChannelHandler {
                         clientIdentity = new ClientIdentity(rollback.getDestination(),
                                                             Short.valueOf(rollback.getClientId()));
                         if (rollback.getBatchId() == 0L) {
-                            embededServer.rollback(clientIdentity);// 回滚所有批次
+                            embeddedServer.rollback(clientIdentity);// 回滚所有批次
                         } else {
-                            embededServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
+                            embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
                         }
                     } else {
                         NettyUtils.error(401,
@@ -212,7 +210,7 @@ public class SessionHandler extends SimpleChannelHandler {
     }
 
     private void stopCanalInstanceIfNecessary(ClientIdentity clientIdentity) {
-        List<ClientIdentity> clientIdentitys = embededServer.listAllSubscribe(clientIdentity.getDestination());
+        List<ClientIdentity> clientIdentitys = embeddedServer.listAllSubscribe(clientIdentity.getDestination());
         if (clientIdentitys != null && clientIdentitys.size() == 1 && clientIdentitys.contains(clientIdentity)) {
             ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
             if (runningMonitor.isStart()) {
@@ -242,8 +240,8 @@ public class SessionHandler extends SimpleChannelHandler {
         }
     }
 
-    public void setEmbededServer(CanalServerWithEmbeded embededServer) {
-        this.embededServer = embededServer;
+    public void setEmbeddedServer(CanalServerWithEmbedded embeddedServer) {
+        this.embeddedServer = embeddedServer;
     }
 
 }

+ 3 - 3
server/src/test/java/com/alibaba/otter/canal/server/BaseCanalServerWithEmbededTest.java

@@ -14,7 +14,7 @@ import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.CanalHASwitchable;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.server.embeded.CanalServerWithEmbeded;
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 
 public abstract class BaseCanalServerWithEmbededTest {
 
@@ -26,12 +26,12 @@ public abstract class BaseCanalServerWithEmbededTest {
     protected static final String  PASSWORD       = "retl";
     protected static final String  FILTER         = "retl\\..*,erosa.zk_complaint_bizdata";
 
-    private CanalServerWithEmbeded server;
+    private CanalServerWithEmbedded server;
     private ClientIdentity         clientIdentity = new ClientIdentity(DESTINATION, (short) 1);                               ;
 
     @Before
     public void setUp() {
-        server = new CanalServerWithEmbeded();
+        server = new CanalServerWithEmbedded();
         server.setCanalInstanceGenerator(new CanalInstanceGenerator() {
 
             public CanalInstance generate(String destination) {

+ 1 - 1
server/src/test/java/com/alibaba/otter/canal/server/CanalServerWithEmbeded_StandaloneTest.java → server/src/test/java/com/alibaba/otter/canal/server/CanalServerWithEmbedded_StandaloneTest.java

@@ -11,7 +11,7 @@ import com.alibaba.otter.canal.instance.manager.model.CanalParameter.MetaMode;
 import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
 import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageMode;
 
-public class CanalServerWithEmbeded_StandaloneTest extends BaseCanalServerWithEmbededTest {
+public class CanalServerWithEmbedded_StandaloneTest extends BaseCanalServerWithEmbededTest {
 
     protected Canal buildCanal() {
         Canal canal = new Canal();

+ 1 - 1
server/src/test/java/com/alibaba/otter/canal/server/CanalServerWithEmbeded_StandbyTest.java → server/src/test/java/com/alibaba/otter/canal/server/CanalServerWithEmbedded_StandbyTest.java

@@ -15,7 +15,7 @@ import com.alibaba.otter.canal.instance.manager.model.CanalParameter.MetaMode;
 import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
 import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageMode;
 
-public class CanalServerWithEmbeded_StandbyTest extends BaseCanalServerWithEmbededTest {
+public class CanalServerWithEmbedded_StandbyTest extends BaseCanalServerWithEmbededTest {
 
     private ZkClient zkClient = new ZkClient(cluster1);
 

+ 7 - 5
server/src/test/java/com/alibaba/otter/canal/server/CanalServerWithNettyTest.java

@@ -31,7 +31,7 @@ import com.alibaba.otter.canal.protocol.CanalPacket.Packet;
 import com.alibaba.otter.canal.protocol.CanalPacket.PacketType;
 import com.alibaba.otter.canal.protocol.CanalPacket.Sub;
 import com.alibaba.otter.canal.protocol.CanalPacket.Unsub;
-import com.alibaba.otter.canal.server.embeded.CanalServerWithEmbeded;
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;
 
 public class CanalServerWithNettyTest {
@@ -49,16 +49,18 @@ public class CanalServerWithNettyTest {
 
     @Before
     public void setUp() {
-        CanalServerWithEmbeded embededServer = new CanalServerWithEmbeded();
-        embededServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
+        CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();
+        embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator()
+        {
 
-            public CanalInstance generate(String destination) {
+            public CanalInstance generate(String destination)
+            {
                 Canal canal = buildCanal();
                 return new CanalInstanceWithManager(canal, FILTER);
             }
         });
 
-        nettyServer = new CanalServerWithNetty(embededServer);
+        nettyServer = new CanalServerWithNetty(embeddedServer);
         nettyServer.setPort(1088);
         nettyServer.start();
     }

+ 3 - 5
sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

@@ -132,8 +132,8 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
             boolean need = filter.filter(name);
             if (!need) {
                 logger.debug("filter name[{}] entry : {}:{}",
-                    new Object[] { name, event.getEntry().getHeader().getLogfileName(),
-                            event.getEntry().getHeader().getLogfileOffset() });
+                        name, event.getEntry().getHeader().getLogfileName(),
+                        event.getEntry().getHeader().getLogfileOffset());
             }
 
             return need;
@@ -178,9 +178,7 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
     }
 
     private String getSchemaNameAndTableName(CanalEntry.Entry entry) {
-        StringBuilder result = new StringBuilder();
-        result.append(entry.getHeader().getSchemaName()).append(".").append(entry.getHeader().getTableName());
-        return result.toString();
+        return entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName();
     }
 
     public void setEventStore(CanalEventStore<Event> eventStore) {