Browse Source

make zookeeperx work

alex.zheng 7 years ago
parent
commit
29815c41b0

+ 30 - 92
common/src/main/java/com/alibaba/otter/canal/common/zookeeper/ZooKeeperx.java

@@ -1,31 +1,26 @@
 package com.alibaba.otter.canal.common.zookeeper;
 package com.alibaba.otter.canal.common.zookeeper;
 
 
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkException;
 import org.I0Itec.zkclient.exception.ZkException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.ClientCnxn;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.client.ConnectStringParser;
 import org.apache.zookeeper.client.ConnectStringParser;
 import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.client.StaticHostProvider;
 import org.apache.zookeeper.client.StaticHostProvider;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.ReflectionUtils;
 import org.springframework.util.ReflectionUtils;
 
 
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 /**
 /**
  * 封装了ZooKeeper,使其支持节点的优先顺序,比如美国机房的节点会优先加载美国对应的zk集群列表,都失败后才会选择加载杭州的zk集群列表 *
  * 封装了ZooKeeper,使其支持节点的优先顺序,比如美国机房的节点会优先加载美国对应的zk集群列表,都失败后才会选择加载杭州的zk集群列表 *
  * 
  * 
@@ -38,14 +33,12 @@ public class ZooKeeperx extends ZkConnection {
     private static final Logger logger                  = LoggerFactory.getLogger(ZooKeeperx.class);
     private static final Logger logger                  = LoggerFactory.getLogger(ZooKeeperx.class);
     private static final Field  clientCnxnField         = ReflectionUtils.findField(ZooKeeper.class, "cnxn");
     private static final Field  clientCnxnField         = ReflectionUtils.findField(ZooKeeper.class, "cnxn");
     private static final Field  hostProviderField       = ReflectionUtils.findField(ClientCnxn.class, "hostProvider");
     private static final Field  hostProviderField       = ReflectionUtils.findField(ClientCnxn.class, "hostProvider");
-    private static final Field  serverAddressesField    = ReflectionUtils.findField(StaticHostProvider.class,
-                                                            "serverAddresses");
+    private static final Field  serverAddressesField    = ReflectionUtils.findField(StaticHostProvider.class, "serverAddresses");
+    private static final Field  zookeeperLockField      = ReflectionUtils.findField(ZkConnection.class, "_zookeeperLock");
+    private static final Field  zookeeperFiled          = ReflectionUtils.findField(ZkConnection.class, "_zk");
     private static final int    DEFAULT_SESSION_TIMEOUT = 90000;
     private static final int    DEFAULT_SESSION_TIMEOUT = 90000;
 
 
-    private ZooKeeper           _zk                     = null;
-    private Lock                _zookeeperLock          = new ReentrantLock();
-
-    private final List<String>  _servers;
+    private final List<String>  _serversList;
     private final int           _sessionTimeOut;
     private final int           _sessionTimeOut;
 
 
     public ZooKeeperx(String zkServers){
     public ZooKeeperx(String zkServers){
@@ -54,114 +47,58 @@ public class ZooKeeperx extends ZkConnection {
 
 
     public ZooKeeperx(String zkServers, int sessionTimeOut){
     public ZooKeeperx(String zkServers, int sessionTimeOut){
         super(zkServers, sessionTimeOut);
         super(zkServers, sessionTimeOut);
-        _servers = Arrays.asList(StringUtils.split(zkServers, SERVER_COMMA));
+        _serversList = Arrays.asList(StringUtils.split(this.getServers(), SERVER_COMMA));
         _sessionTimeOut = sessionTimeOut;
         _sessionTimeOut = sessionTimeOut;
     }
     }
 
 
     @Override
     @Override
     public void connect(Watcher watcher) {
     public void connect(Watcher watcher) {
+        ReflectionUtils.makeAccessible(zookeeperLockField);
+        ReflectionUtils.makeAccessible(zookeeperFiled);
+        Lock _zookeeperLock = (ReentrantLock) ReflectionUtils.getField(zookeeperLockField, this);
+        ZooKeeper _zk = (ZooKeeper) ReflectionUtils.getField(zookeeperFiled, this);
+
         _zookeeperLock.lock();
         _zookeeperLock.lock();
         try {
         try {
             if (_zk != null) {
             if (_zk != null) {
                 throw new IllegalStateException("zk client has already been started");
                 throw new IllegalStateException("zk client has already been started");
             }
             }
+            String zkServers = _serversList.get(0);
 
 
             try {
             try {
-                logger.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");
-                _zk = new ZooKeeper(_servers.get(0), _sessionTimeOut, watcher);
+                logger.debug("Creating new ZookKeeper instance to connect to " + zkServers + ".");
+                _zk = new ZooKeeper(zkServers, _sessionTimeOut, watcher);
                 configMutliCluster(_zk);
                 configMutliCluster(_zk);
+                ReflectionUtils.setField(zookeeperFiled, this, _zk);
             } catch (IOException e) {
             } catch (IOException e) {
-                throw new ZkException("Unable to connect to " + _servers, e);
+                throw new ZkException("Unable to connect to " + zkServers, e);
             }
             }
         } finally {
         } finally {
             _zookeeperLock.unlock();
             _zookeeperLock.unlock();
         }
         }
     }
     }
 
 
-    public void close() throws InterruptedException {
-        _zookeeperLock.lock();
-        try {
-            if (_zk != null) {
-                logger.debug("Closing ZooKeeper connected to " + _servers);
-                _zk.close();
-                _zk = null;
-            }
-        } finally {
-            _zookeeperLock.unlock();
-        }
-    }
-
-    public String create(String path, byte[] data, CreateMode mode) throws KeeperException, InterruptedException {
-        return _zk.create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
-    }
-
-    public void delete(String path) throws InterruptedException, KeeperException {
-        _zk.delete(path, -1);
-    }
-
-    public boolean exists(String path, boolean watch) throws KeeperException, InterruptedException {
-        return _zk.exists(path, watch) != null;
-    }
-
-    public List<String> getChildren(final String path, final boolean watch) throws KeeperException,
-                                                                           InterruptedException {
-        return _zk.getChildren(path, watch);
-    }
-
-    public byte[] readData(String path, Stat stat, boolean watch) throws KeeperException, InterruptedException {
-        return _zk.getData(path, watch, stat);
-    }
-
-    public void writeData(String path, byte[] data) throws KeeperException, InterruptedException {
-        writeData(path, data, -1);
-    }
-
-    public void writeData(String path, byte[] data, int version) throws KeeperException, InterruptedException {
-        _zk.setData(path, data, version);
-    }
-
-    public States getZookeeperState() {
-        return _zk != null ? _zk.getState() : null;
-    }
-
-    public ZooKeeper getZookeeper() {
-        return _zk;
-    }
-
-    public long getCreateTime(String path) throws KeeperException, InterruptedException {
-        Stat stat = _zk.exists(path, false);
-        if (stat != null) {
-            return stat.getCtime();
-        }
-        return -1;
-    }
-
-    public String getServers() {
-        return StringUtils.join(_servers, SERVER_COMMA);
-    }
-
     // ===============================
     // ===============================
 
 
     public void configMutliCluster(ZooKeeper zk) {
     public void configMutliCluster(ZooKeeper zk) {
-        if (_servers.size() == 1) {
+        if (_serversList.size() == 1) {
             return;
             return;
         }
         }
-        String cluster1 = _servers.get(0);
+        String cluster1 = _serversList.get(0);
         try {
         try {
-            if (_servers.size() > 1) {
+            if (_serversList.size() > 1) {
                 // 强制的声明accessible
                 // 强制的声明accessible
                 ReflectionUtils.makeAccessible(clientCnxnField);
                 ReflectionUtils.makeAccessible(clientCnxnField);
                 ReflectionUtils.makeAccessible(hostProviderField);
                 ReflectionUtils.makeAccessible(hostProviderField);
                 ReflectionUtils.makeAccessible(serverAddressesField);
                 ReflectionUtils.makeAccessible(serverAddressesField);
 
 
                 // 添加第二组集群列表
                 // 添加第二组集群列表
-                for (int i = 1; i < _servers.size(); i++) {
-                    String cluster = _servers.get(i);
+                for (int i = 1; i < _serversList.size(); i++) {
+                    String cluster = _serversList.get(i);
                     // 强制获取zk中的地址信息
                     // 强制获取zk中的地址信息
                     ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);
                     ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);
                     HostProvider hostProvider = (HostProvider) ReflectionUtils.getField(hostProviderField, cnxn);
                     HostProvider hostProvider = (HostProvider) ReflectionUtils.getField(hostProviderField, cnxn);
-                    List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils.getField(serverAddressesField,
-                        hostProvider);
+                    List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils.getField(serverAddressesField, hostProvider);
                     // 添加第二组集群列表
                     // 添加第二组集群列表
                     serverAddrs.addAll(new ConnectStringParser(cluster).getServerAddresses());
                     serverAddrs.addAll(new ConnectStringParser(cluster).getServerAddresses());
                 }
                 }
@@ -179,3 +116,4 @@ public class ZooKeeperx extends ZkConnection {
 
 
     }
     }
 }
 }
+