mcy 6 years ago
parent
commit
e387cfdee5

+ 34 - 32
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -88,44 +88,46 @@ public class RdbSyncService {
      * @param function 回调方法
      * @param function 回调方法
      */
      */
     public void sync(List<Dml> dmls, Function<Dml, Boolean> function) {
     public void sync(List<Dml> dmls, Function<Dml, Boolean> function) {
-        boolean toExecute = false;
-        for (Dml dml : dmls) {
-            if (!toExecute) {
-                toExecute = function.apply(dml);
-            } else {
-                function.apply(dml);
+        try {
+            boolean toExecute = false;
+            for (Dml dml : dmls) {
+                if (!toExecute) {
+                    toExecute = function.apply(dml);
+                } else {
+                    function.apply(dml);
+                }
             }
             }
-        }
-        if (toExecute) {
-            List<Future> futures = new ArrayList<>();
-            for (int i = 0; i < threads; i++) {
-                int j = i;
-                futures.add(executorThreads[i].submit(() -> {
+            if (toExecute) {
+                List<Future> futures = new ArrayList<>();
+                for (int i = 0; i < threads; i++) {
+                    int j = i;
+                    futures.add(executorThreads[i].submit(() -> {
+                        try {
+                            dmlsPartition[j]
+                                .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
+                            dmlsPartition[j].clear();
+                            batchExecutors[j].commit();
+                            return true;
+                        } catch (Throwable e) {
+                            batchExecutors[j].rollback();
+                            throw new RuntimeException(e);
+                        }
+                    }));
+                }
+
+                futures.forEach(future -> {
                     try {
                     try {
-                        dmlsPartition[j]
-                            .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
-                        dmlsPartition[j].clear();
-                        batchExecutors[j].commit();
-                        return true;
-                    } catch (Throwable e) {
-                        batchExecutors[j].rollback();
+                        future.get();
+                    } catch (ExecutionException | InterruptedException e) {
                         throw new RuntimeException(e);
                         throw new RuntimeException(e);
                     }
                     }
-                }));
+                });
             }
             }
-
-            futures.forEach(future -> {
-                try {
-                    future.get();
-                } catch (ExecutionException | InterruptedException e) {
-                    throw new RuntimeException(e);
+        } finally {
+            for (BatchExecutor batchExecutor : batchExecutors) {
+                if (batchExecutor != null) {
+                    batchExecutor.close();
                 }
                 }
-            });
-        }
-
-        for (BatchExecutor batchExecutor : batchExecutors) {
-            if (batchExecutor != null) {
-                batchExecutor.close();
             }
             }
         }
         }
     }
     }

+ 8 - 4
common/src/main/java/com/alibaba/otter/canal/common/zookeeper/ZkClientx.java

@@ -16,7 +16,7 @@ import com.google.common.collect.MigrateMap;
 
 
 /**
 /**
  * 使用自定义的ZooKeeperx for zk connection
  * 使用自定义的ZooKeeperx for zk connection
- * 
+ *
  * @author jianghang 2012-7-10 下午02:31:15
  * @author jianghang 2012-7-10 下午02:31:15
  * @version 1.0.0
  * @version 1.0.0
  */
  */
@@ -34,6 +34,10 @@ public class ZkClientx extends ZkClient {
         return clients.get(servers);
         return clients.get(servers);
     }
     }
 
 
+    public static ZkClientx removeZkClient(String servers) {
+        return clients.remove(servers);
+    }
+
     public ZkClientx(String serverstring){
     public ZkClientx(String serverstring){
         this(serverstring, Integer.MAX_VALUE);
         this(serverstring, Integer.MAX_VALUE);
     }
     }
@@ -60,7 +64,7 @@ public class ZkClientx extends ZkClient {
 
 
     /**
     /**
      * Create a persistent Sequential node.
      * Create a persistent Sequential node.
-     * 
+     *
      * @param path
      * @param path
      * @param createParents if true all parent dirs are created as well and no
      * @param createParents if true all parent dirs are created as well and no
      * {@link ZkNodeExistsException} is thrown in case the path already exists
      * {@link ZkNodeExistsException} is thrown in case the path already exists
@@ -88,7 +92,7 @@ public class ZkClientx extends ZkClient {
 
 
     /**
     /**
      * Create a persistent Sequential node.
      * Create a persistent Sequential node.
-     * 
+     *
      * @param path
      * @param path
      * @param data
      * @param data
      * @param createParents if true all parent dirs are created as well and no
      * @param createParents if true all parent dirs are created as well and no
@@ -119,7 +123,7 @@ public class ZkClientx extends ZkClient {
 
 
     /**
     /**
      * Create a persistent Sequential node.
      * Create a persistent Sequential node.
-     * 
+     *
      * @param path
      * @param path
      * @param data
      * @param data
      * @param createParents if true all parent dirs are created as well and no
      * @param createParents if true all parent dirs are created as well and no

+ 84 - 65
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -68,6 +68,7 @@ public class CanalController {
 
 
     private CanalInstanceGenerator                   instanceGenerator;
     private CanalInstanceGenerator                   instanceGenerator;
     private ZkClientx                                zkclientx;
     private ZkClientx                                zkclientx;
+    private String                                   zkServers;
 
 
     private CanalMQStarter                           canalMQStarter;
     private CanalMQStarter                           canalMQStarter;
 
 
@@ -132,6 +133,7 @@ public class CanalController {
         }
         }
         final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
         final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
         if (StringUtils.isNotEmpty(zkServers)) {
         if (StringUtils.isNotEmpty(zkServers)) {
+            this.zkServers = zkServers;
             zkclientx = ZkClientx.getZkClient(zkServers);
             zkclientx = ZkClientx.getZkClient(zkServers);
             // 初始化系统目录
             // 初始化系统目录
             zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
             zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
@@ -140,80 +142,81 @@ public class CanalController {
 
 
         final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
         final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
         ServerRunningMonitors.setServerData(serverData);
         ServerRunningMonitors.setServerData(serverData);
-        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
-
-            public ServerRunningMonitor apply(final String destination) {
-                ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
-                runningMonitor.setDestination(destination);
-                runningMonitor.setListener(new ServerRunningListener() {
-
-                    public void processActiveEnter() {
-                        try {
-                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                            embededCanalServer.start(destination);
-                        } finally {
-                            MDC.remove(CanalConstants.MDC_DESTINATION);
+        ServerRunningMonitors
+            .setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
+
+                public ServerRunningMonitor apply(final String destination) {
+                    ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
+                    runningMonitor.setDestination(destination);
+                    runningMonitor.setListener(new ServerRunningListener() {
+
+                        public void processActiveEnter() {
+                            try {
+                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                                embededCanalServer.start(destination);
+                            } finally {
+                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                            }
                         }
                         }
-                    }
 
 
-                    public void processActiveExit() {
-                        try {
-                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                            embededCanalServer.stop(destination);
-                        } finally {
-                            MDC.remove(CanalConstants.MDC_DESTINATION);
+                        public void processActiveExit() {
+                            try {
+                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                                embededCanalServer.stop(destination);
+                            } finally {
+                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                            }
                         }
                         }
-                    }
-
-                    public void processStart() {
-                        try {
-                            if (zkclientx != null) {
-                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
-                                                                                                              + port);
-                                initCid(path);
-                                zkclientx.subscribeStateChanges(new IZkStateListener() {
-
-                                    public void handleStateChanged(KeeperState state) throws Exception {
-
-                                    }
 
 
-                                    public void handleNewSession() throws Exception {
-                                        initCid(path);
-                                    }
-
-                                    @Override
-                                    public void handleSessionEstablishmentError(Throwable error) throws Exception {
-                                        logger.error("failed to connect to zookeeper", error);
-                                    }
-                                });
+                        public void processStart() {
+                            try {
+                                if (zkclientx != null) {
+                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
+                                        ip + ":" + port);
+                                    initCid(path);
+                                    zkclientx.subscribeStateChanges(new IZkStateListener() {
+
+                                        public void handleStateChanged(KeeperState state) throws Exception {
+
+                                        }
+
+                                        public void handleNewSession() throws Exception {
+                                            initCid(path);
+                                        }
+
+                                        @Override
+                                        public void handleSessionEstablishmentError(Throwable error) throws Exception {
+                                            logger.error("failed to connect to zookeeper", error);
+                                        }
+                                    });
+                                }
+                            } finally {
+                                MDC.remove(CanalConstants.MDC_DESTINATION);
                             }
                             }
-                        } finally {
-                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
                         }
-                    }
 
 
-                    public void processStop() {
-                        try {
-                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                            if (zkclientx != null) {
-                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
-                                                                                                              + port);
-                                releaseCid(path);
+                        public void processStop() {
+                            try {
+                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                                if (zkclientx != null) {
+                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
+                                        ip + ":" + port);
+                                    releaseCid(path);
+                                }
+                            } finally {
+                                MDC.remove(CanalConstants.MDC_DESTINATION);
                             }
                             }
-                        } finally {
-                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
                         }
-                    }
 
 
-                });
-                if (zkclientx != null) {
-                    runningMonitor.setZkClient(zkclientx);
+                    });
+                    if (zkclientx != null) {
+                        runningMonitor.setZkClient(zkclientx);
+                    }
+                    // 触发创建一下cid节点
+                    runningMonitor.init();
+                    return runningMonitor;
                 }
                 }
-                // 触发创建一下cid节点
-                runningMonitor.init();
-                return runningMonitor;
-            }
-        }));
+            }));
 
 
         // 初始化monitor机制
         // 初始化monitor机制
         autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
         autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
@@ -265,7 +268,8 @@ public class CanalController {
             instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
             instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
 
 
                 public InstanceConfigMonitor apply(InstanceMode mode) {
                 public InstanceConfigMonitor apply(InstanceMode mode) {
-                    int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
+                    int scanInterval = Integer
+                        .valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
 
 
                     if (mode.isSpring()) {
                     if (mode.isSpring()) {
                         SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
                         SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
@@ -373,7 +377,8 @@ public class CanalController {
             InstanceConfig oldConfig = instanceConfigs.put(destination, config);
             InstanceConfig oldConfig = instanceConfigs.put(destination, config);
 
 
             if (oldConfig != null) {
             if (oldConfig != null) {
-                logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
+                logger
+                    .warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
             }
             }
         }
         }
     }
     }
@@ -478,6 +483,7 @@ public class CanalController {
     }
     }
 
 
     public void stop() throws Throwable {
     public void stop() throws Throwable {
+
         if (canalServer != null) {
         if (canalServer != null) {
             canalServer.stop();
             canalServer.stop();
         }
         }
@@ -502,6 +508,19 @@ public class CanalController {
 
 
         if (zkclientx != null) {
         if (zkclientx != null) {
             zkclientx.close();
             zkclientx.close();
+            if (zkServers != null) {
+                ZkClientx.removeZkClient(zkServers);
+            }
+        }
+
+        if (instanceConfigs != null) {
+            instanceConfigs.clear();
+        }
+        if (managerClients != null) {
+            managerClients.clear();
+        }
+        if (instanceConfigMonitors != null) {
+            instanceConfigMonitors.clear();
         }
         }
     }
     }
 
 

+ 334 - 329
meta/src/main/java/com/alibaba/otter/canal/meta/ZooKeeperMetaManager.java

@@ -1,329 +1,334 @@
-package com.alibaba.otter.canal.meta;
-
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.commons.lang.StringUtils;
-import org.springframework.util.Assert;
-import org.springframework.util.CollectionUtils;
-
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
-import com.alibaba.otter.canal.common.utils.JsonUtils;
-import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
-import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
-import com.alibaba.otter.canal.meta.exception.CanalMetaManagerException;
-import com.alibaba.otter.canal.protocol.ClientIdentity;
-import com.alibaba.otter.canal.protocol.position.Position;
-import com.alibaba.otter.canal.protocol.position.PositionRange;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * zk 版本的 canal manager, 存储结构:
- * 
- * <pre>
- * /otter
- *    canal
- *      destinations
- *        dest1 
- *          client1
- *            filter
- *            batch_mark
- *              1
- *              2
- *              3
- * </pre>
- * 
- * @author zebin.xuzb @ 2012-6-21
- * @author jianghang
- * @version 1.0.0
- */
-public class ZooKeeperMetaManager extends AbstractCanalLifeCycle implements CanalMetaManager {
-
-    private static final String ENCODE = "UTF-8";
-    private ZkClientx           zkClientx;
-
-    public void start() {
-        super.start();
-
-        Assert.notNull(zkClientx);
-    }
-
-    public void stop() {
-        super.stop();
-    }
-
-    public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
-            clientIdentity.getClientId());
-
-        try {
-            zkClientx.createPersistent(path, true);
-        } catch (ZkNodeExistsException e) {
-            // ignore
-        }
-        if (clientIdentity.hasFilter()) {
-            String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
-                clientIdentity.getClientId());
-
-            byte[] bytes = null;
-            try {
-                bytes = clientIdentity.getFilter().getBytes(ENCODE);
-            } catch (UnsupportedEncodingException e) {
-                throw new CanalMetaManagerException(e);
-            }
-
-            try {
-                zkClientx.createPersistent(filterPath, bytes);
-            } catch (ZkNodeExistsException e) {
-                // ignore
-                zkClientx.writeData(filterPath, bytes);
-            }
-        }
-    }
-
-    public boolean hasSubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
-            clientIdentity.getClientId());
-        return zkClientx.exists(path);
-    }
-
-    public void unsubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
-            clientIdentity.getClientId());
-        zkClientx.deleteRecursive(path); // 递归删除所有信息
-    }
-
-    public List<ClientIdentity> listAllSubscribeInfo(String destination) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getDestinationPath(destination);
-        List<String> childs = null;
-        try {
-            childs = zkClientx.getChildren(path);
-        } catch (ZkNoNodeException e) {
-            // ignore
-        }
-
-        if (CollectionUtils.isEmpty(childs)) {
-            return new ArrayList<ClientIdentity>();
-        }
-        List<Short> clientIds = new ArrayList<Short>();
-        for (String child : childs) {
-            if (StringUtils.isNumeric(child)) {
-                clientIds.add(ZookeeperPathUtils.getClientId(child));
-            }
-        }
-
-        Collections.sort(clientIds); // 进行一个排序
-        List<ClientIdentity> clientIdentities = Lists.newArrayList();
-        for (Short clientId : clientIds) {
-            path = ZookeeperPathUtils.getFilterPath(destination, clientId);
-            byte[] bytes = zkClientx.readData(path, true);
-            String filter = null;
-            if (bytes != null) {
-                try {
-                    filter = new String(bytes, ENCODE);
-                } catch (UnsupportedEncodingException e) {
-                    throw new CanalMetaManagerException(e);
-                }
-            }
-            clientIdentities.add(new ClientIdentity(destination, clientId, filter));
-        }
-
-        return clientIdentities;
-    }
-
-    public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-
-        byte[] data = zkClientx.readData(path, true);
-        if (data == null || data.length == 0) {
-            return null;
-        }
-
-        return JsonUtils.unmarshalFromByte(data, Position.class);
-    }
-
-    public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-        byte[] data = JsonUtils.marshalToByte(position, SerializerFeature.WriteClassName);
-        try {
-            zkClientx.writeData(path, data);
-        } catch (ZkNoNodeException e) {
-            zkClientx.createPersistent(path, data, true);// 第一次节点不存在,则尝试重建
-        }
-    }
-
-    public Long addBatch(ClientIdentity clientIdentity, PositionRange positionRange) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-        byte[] data = JsonUtils.marshalToByte(positionRange, SerializerFeature.WriteClassName);
-        String batchPath = zkClientx.createPersistentSequential(path + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR,
-            data,
-            true);
-        String batchIdString = StringUtils.substringAfterLast(batchPath, ZookeeperPathUtils.ZOOKEEPER_SEPARATOR);
-        return ZookeeperPathUtils.getBatchMarkId(batchIdString);
-    }
-
-    public void addBatch(ClientIdentity clientIdentity, PositionRange positionRange, Long batchId)
-                                                                                                  throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getBatchMarkWithIdPath(clientIdentity.getDestination(),
-            clientIdentity.getClientId(),
-            batchId);
-        byte[] data = JsonUtils.marshalToByte(positionRange, SerializerFeature.WriteClassName);
-        zkClientx.createPersistent(path, data, true);
-    }
-
-    public PositionRange removeBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
-        String batchsPath = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
-            clientIdentity.getClientId());
-        List<String> nodes = zkClientx.getChildren(batchsPath);
-        if (CollectionUtils.isEmpty(nodes)) {
-            // 没有batch记录
-            return null;
-        }
-
-        // 找到最小的Id
-        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
-        for (String batchIdString : nodes) {
-            batchIds.add(Long.valueOf(batchIdString));
-        }
-        Long minBatchId = Collections.min(batchIds);
-        if (!minBatchId.equals(batchId)) {
-            // 检查一下提交的ack/rollback,必须按batchId分出去的顺序提交,否则容易出现丢数据
-            throw new CanalMetaManagerException(String.format("batchId:%d is not the firstly:%d", batchId, minBatchId));
-        }
-
-        if (!batchIds.contains(batchId)) {
-            // 不存在对应的batchId
-            return null;
-        }
-        PositionRange positionRange = getBatch(clientIdentity, batchId);
-        if (positionRange != null) {
-            String path = ZookeeperPathUtils.getBatchMarkWithIdPath(clientIdentity.getDestination(),
-                clientIdentity.getClientId(),
-                batchId);
-            zkClientx.delete(path);
-        }
-
-        return positionRange;
-    }
-
-    public PositionRange getBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getBatchMarkWithIdPath(clientIdentity.getDestination(),
-            clientIdentity.getClientId(),
-            batchId);
-        byte[] data = zkClientx.readData(path, true);
-        if (data == null) {
-            return null;
-        }
-
-        PositionRange positionRange = JsonUtils.unmarshalFromByte(data, PositionRange.class);
-        return positionRange;
-    }
-
-    public void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManagerException {
-        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-        List<String> batchChilds = zkClientx.getChildren(path);
-
-        for (String batchChild : batchChilds) {
-            String batchPath = path + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + batchChild;
-            zkClientx.delete(batchPath);
-        }
-    }
-
-    public PositionRange getLastestBatch(ClientIdentity clientIdentity) {
-        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-        List<String> nodes = null;
-        try {
-            nodes = zkClientx.getChildren(path);
-        } catch (ZkNoNodeException e) {
-            // ignore
-        }
-
-        if (CollectionUtils.isEmpty(nodes)) {
-            return null;
-        }
-        // 找到最大的Id
-        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
-        for (String batchIdString : nodes) {
-            batchIds.add(Long.valueOf(batchIdString));
-        }
-        Long maxBatchId = Collections.max(batchIds);
-        PositionRange result = getBatch(clientIdentity, maxBatchId);
-        if (result == null) { // 出现为null,说明zk节点有变化,重新获取
-            return getLastestBatch(clientIdentity);
-        } else {
-            return result;
-        }
-    }
-
-    public PositionRange getFirstBatch(ClientIdentity clientIdentity) {
-        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-        List<String> nodes = null;
-        try {
-            nodes = zkClientx.getChildren(path);
-        } catch (ZkNoNodeException e) {
-            // ignore
-        }
-
-        if (CollectionUtils.isEmpty(nodes)) {
-            return null;
-        }
-        // 找到最小的Id
-        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
-        for (String batchIdString : nodes) {
-            batchIds.add(Long.valueOf(batchIdString));
-        }
-        Long minBatchId = Collections.min(batchIds);
-        PositionRange result = getBatch(clientIdentity, minBatchId);
-        if (result == null) { // 出现为null,说明zk节点有变化,重新获取
-            return getFirstBatch(clientIdentity);
-        } else {
-            return result;
-        }
-    }
-
-    public Map<Long, PositionRange> listAllBatchs(ClientIdentity clientIdentity) {
-        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(), clientIdentity.getClientId());
-        List<String> nodes = null;
-        try {
-            nodes = zkClientx.getChildren(path);
-        } catch (ZkNoNodeException e) {
-            // ignore
-        }
-
-        if (CollectionUtils.isEmpty(nodes)) {
-            return Maps.newHashMap();
-        }
-        // 找到最大的Id
-        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
-        for (String batchIdString : nodes) {
-            batchIds.add(Long.valueOf(batchIdString));
-        }
-
-        Collections.sort(batchIds); // 从小到大排序
-        Map<Long, PositionRange> positionRanges = Maps.newLinkedHashMap();
-        for (Long batchId : batchIds) {
-            PositionRange result = getBatch(clientIdentity, batchId);
-            if (result == null) {// 出现为null,说明zk节点有变化,重新获取
-                return listAllBatchs(clientIdentity);
-            } else {
-                positionRanges.put(batchId, result);
-            }
-        }
-
-        return positionRanges;
-    }
-
-    // =========== setter ==========
-
-    public void setZkClientx(ZkClientx zkClientx) {
-        this.zkClientx = zkClientx;
-    }
-
-}
+package com.alibaba.otter.canal.meta;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
+
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
+import com.alibaba.otter.canal.common.utils.JsonUtils;
+import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
+import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
+import com.alibaba.otter.canal.meta.exception.CanalMetaManagerException;
+import com.alibaba.otter.canal.protocol.ClientIdentity;
+import com.alibaba.otter.canal.protocol.position.Position;
+import com.alibaba.otter.canal.protocol.position.PositionRange;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * zk 版本的 canal manager, 存储结构:
+ *
+ * <pre>
+ * /otter
+ *    canal
+ *      destinations
+ *        dest1
+ *          client1
+ *            filter
+ *            batch_mark
+ *              1
+ *              2
+ *              3
+ * </pre>
+ *
+ * @author zebin.xuzb @ 2012-6-21
+ * @author jianghang
+ * @version 1.0.0
+ */
+public class ZooKeeperMetaManager extends AbstractCanalLifeCycle implements CanalMetaManager {
+
+    private static final String ENCODE = "UTF-8";
+    private ZkClientx           zkClientx;
+
+    public void start() {
+        super.start();
+
+        Assert.notNull(zkClientx);
+    }
+
+    public void stop() {
+        zkClientx = null;
+        super.stop();
+    }
+
+    public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+
+        try {
+            zkClientx.createPersistent(path, true);
+        } catch (ZkNodeExistsException e) {
+            // ignore
+        }
+        if (clientIdentity.hasFilter()) {
+            String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
+                clientIdentity.getClientId());
+
+            byte[] bytes = null;
+            try {
+                bytes = clientIdentity.getFilter().getBytes(ENCODE);
+            } catch (UnsupportedEncodingException e) {
+                throw new CanalMetaManagerException(e);
+            }
+
+            try {
+                zkClientx.createPersistent(filterPath, bytes);
+            } catch (ZkNodeExistsException e) {
+                // ignore
+                zkClientx.writeData(filterPath, bytes);
+            }
+        }
+    }
+
+    public boolean hasSubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        return zkClientx.exists(path);
+    }
+
+    public void unsubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        zkClientx.deleteRecursive(path); // 递归删除所有信息
+    }
+
+    public List<ClientIdentity> listAllSubscribeInfo(String destination) throws CanalMetaManagerException {
+        if (zkClientx == null) {
+            return new ArrayList<ClientIdentity>();
+        }
+        String path = ZookeeperPathUtils.getDestinationPath(destination);
+        List<String> childs = null;
+        try {
+            childs = zkClientx.getChildren(path);
+        } catch (ZkNoNodeException e) {
+            // ignore
+        }
+
+        if (CollectionUtils.isEmpty(childs)) {
+            return new ArrayList<ClientIdentity>();
+        }
+        List<Short> clientIds = new ArrayList<Short>();
+        for (String child : childs) {
+            if (StringUtils.isNumeric(child)) {
+                clientIds.add(ZookeeperPathUtils.getClientId(child));
+            }
+        }
+
+        Collections.sort(clientIds); // 进行一个排序
+        List<ClientIdentity> clientIdentities = Lists.newArrayList();
+        for (Short clientId : clientIds) {
+            path = ZookeeperPathUtils.getFilterPath(destination, clientId);
+            byte[] bytes = zkClientx.readData(path, true);
+            String filter = null;
+            if (bytes != null) {
+                try {
+                    filter = new String(bytes, ENCODE);
+                } catch (UnsupportedEncodingException e) {
+                    throw new CanalMetaManagerException(e);
+                }
+            }
+            clientIdentities.add(new ClientIdentity(destination, clientId, filter));
+        }
+
+        return clientIdentities;
+    }
+
+    public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
+
+        byte[] data = zkClientx.readData(path, true);
+        if (data == null || data.length == 0) {
+            return null;
+        }
+
+        return JsonUtils.unmarshalFromByte(data, Position.class);
+    }
+
+    public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
+        byte[] data = JsonUtils.marshalToByte(position, SerializerFeature.WriteClassName);
+        try {
+            zkClientx.writeData(path, data);
+        } catch (ZkNoNodeException e) {
+            zkClientx.createPersistent(path, data, true);// 第一次节点不存在,则尝试重建
+        }
+    }
+
+    public Long addBatch(ClientIdentity clientIdentity, PositionRange positionRange) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        byte[] data = JsonUtils.marshalToByte(positionRange, SerializerFeature.WriteClassName);
+        String batchPath = zkClientx
+            .createPersistentSequential(path + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR, data, true);
+        String batchIdString = StringUtils.substringAfterLast(batchPath, ZookeeperPathUtils.ZOOKEEPER_SEPARATOR);
+        return ZookeeperPathUtils.getBatchMarkId(batchIdString);
+    }
+
+    public void addBatch(ClientIdentity clientIdentity, PositionRange positionRange,
+                         Long batchId) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils
+            .getBatchMarkWithIdPath(clientIdentity.getDestination(), clientIdentity.getClientId(), batchId);
+        byte[] data = JsonUtils.marshalToByte(positionRange, SerializerFeature.WriteClassName);
+        zkClientx.createPersistent(path, data, true);
+    }
+
+    public PositionRange removeBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
+        String batchsPath = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        List<String> nodes = zkClientx.getChildren(batchsPath);
+        if (CollectionUtils.isEmpty(nodes)) {
+            // 没有batch记录
+            return null;
+        }
+
+        // 找到最小的Id
+        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
+        for (String batchIdString : nodes) {
+            batchIds.add(Long.valueOf(batchIdString));
+        }
+        Long minBatchId = Collections.min(batchIds);
+        if (!minBatchId.equals(batchId)) {
+            // 检查一下提交的ack/rollback,必须按batchId分出去的顺序提交,否则容易出现丢数据
+            throw new CanalMetaManagerException(String.format("batchId:%d is not the firstly:%d", batchId, minBatchId));
+        }
+
+        if (!batchIds.contains(batchId)) {
+            // 不存在对应的batchId
+            return null;
+        }
+        PositionRange positionRange = getBatch(clientIdentity, batchId);
+        if (positionRange != null) {
+            String path = ZookeeperPathUtils
+                .getBatchMarkWithIdPath(clientIdentity.getDestination(), clientIdentity.getClientId(), batchId);
+            zkClientx.delete(path);
+        }
+
+        return positionRange;
+    }
+
+    public PositionRange getBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils
+            .getBatchMarkWithIdPath(clientIdentity.getDestination(), clientIdentity.getClientId(), batchId);
+        byte[] data = zkClientx.readData(path, true);
+        if (data == null) {
+            return null;
+        }
+
+        PositionRange positionRange = JsonUtils.unmarshalFromByte(data, PositionRange.class);
+        return positionRange;
+    }
+
+    public void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManagerException {
+        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        List<String> batchChilds = zkClientx.getChildren(path);
+
+        for (String batchChild : batchChilds) {
+            String batchPath = path + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + batchChild;
+            zkClientx.delete(batchPath);
+        }
+    }
+
+    public PositionRange getLastestBatch(ClientIdentity clientIdentity) {
+        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        List<String> nodes = null;
+        try {
+            nodes = zkClientx.getChildren(path);
+        } catch (ZkNoNodeException e) {
+            // ignore
+        }
+
+        if (CollectionUtils.isEmpty(nodes)) {
+            return null;
+        }
+        // 找到最大的Id
+        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
+        for (String batchIdString : nodes) {
+            batchIds.add(Long.valueOf(batchIdString));
+        }
+        Long maxBatchId = Collections.max(batchIds);
+        PositionRange result = getBatch(clientIdentity, maxBatchId);
+        if (result == null) { // 出现为null,说明zk节点有变化,重新获取
+            return getLastestBatch(clientIdentity);
+        } else {
+            return result;
+        }
+    }
+
+    public PositionRange getFirstBatch(ClientIdentity clientIdentity) {
+        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        List<String> nodes = null;
+        try {
+            nodes = zkClientx.getChildren(path);
+        } catch (ZkNoNodeException e) {
+            // ignore
+        }
+
+        if (CollectionUtils.isEmpty(nodes)) {
+            return null;
+        }
+        // 找到最小的Id
+        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
+        for (String batchIdString : nodes) {
+            batchIds.add(Long.valueOf(batchIdString));
+        }
+        Long minBatchId = Collections.min(batchIds);
+        PositionRange result = getBatch(clientIdentity, minBatchId);
+        if (result == null) { // 出现为null,说明zk节点有变化,重新获取
+            return getFirstBatch(clientIdentity);
+        } else {
+            return result;
+        }
+    }
+
+    public Map<Long, PositionRange> listAllBatchs(ClientIdentity clientIdentity) {
+        String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
+            clientIdentity.getClientId());
+        List<String> nodes = null;
+        try {
+            nodes = zkClientx.getChildren(path);
+        } catch (ZkNoNodeException e) {
+            // ignore
+        }
+
+        if (CollectionUtils.isEmpty(nodes)) {
+            return Maps.newHashMap();
+        }
+        // 找到最大的Id
+        ArrayList<Long> batchIds = new ArrayList<Long>(nodes.size());
+        for (String batchIdString : nodes) {
+            batchIds.add(Long.valueOf(batchIdString));
+        }
+
+        Collections.sort(batchIds); // 从小到大排序
+        Map<Long, PositionRange> positionRanges = Maps.newLinkedHashMap();
+        for (Long batchId : batchIds) {
+            PositionRange result = getBatch(clientIdentity, batchId);
+            if (result == null) {// 出现为null,说明zk节点有变化,重新获取
+                return listAllBatchs(clientIdentity);
+            } else {
+                positionRanges.put(batchId, result);
+            }
+        }
+
+        return positionRanges;
+    }
+
+    // =========== setter ==========
+
+    public void setZkClientx(ZkClientx zkClientx) {
+        this.zkClientx = zkClientx;
+    }
+
+}