浏览代码

polish the code thus make it more stable (#3855)

Signed-off-by: 元组 <zhaojunwang.zjw@alibaba-inc.com>
tuple 3 年之前
父节点
当前提交
b54bea5e33

+ 13 - 0
client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterCanalConnector.java

@@ -39,6 +39,7 @@ public class ClusterCanalConnector implements CanalConnector {
         this.accessStrategy = accessStrategy;
     }
 
+    @Override
     public void connect() throws CanalClientException {
         while (currentConnector == null) {
             int times = 0;
@@ -85,10 +86,12 @@ public class ClusterCanalConnector implements CanalConnector {
         }
     }
 
+    @Override
     public boolean checkValid() {
         return currentConnector != null && currentConnector.checkValid();
     }
 
+    @Override
     public void disconnect() throws CanalClientException {
         if (currentConnector != null) {
             currentConnector.disconnect();
@@ -96,10 +99,12 @@ public class ClusterCanalConnector implements CanalConnector {
         }
     }
 
+    @Override
     public void subscribe() throws CanalClientException {
         subscribe(""); // 传递空字符即可
     }
 
+    @Override
     public void subscribe(String filter) throws CanalClientException {
         int times = 0;
         while (times < retryTimes) {
@@ -125,6 +130,7 @@ public class ClusterCanalConnector implements CanalConnector {
         throw new CanalClientException("failed to subscribe after " + times + " times retry.");
     }
 
+    @Override
     public void unsubscribe() throws CanalClientException {
         int times = 0;
         while (times < retryTimes) {
@@ -142,6 +148,7 @@ public class ClusterCanalConnector implements CanalConnector {
         throw new CanalClientException("failed to unsubscribe after " + times + " times retry.");
     }
 
+    @Override
     public Message get(int batchSize) throws CanalClientException {
         int times = 0;
         while (times < retryTimes) {
@@ -159,6 +166,7 @@ public class ClusterCanalConnector implements CanalConnector {
         throw new CanalClientException("failed to fetch the data after " + times + " times retry");
     }
 
+    @Override
     public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
         int times = 0;
         while (times < retryTimes) {
@@ -176,6 +184,7 @@ public class ClusterCanalConnector implements CanalConnector {
         throw new CanalClientException("failed to fetch the data after " + times + " times retry");
     }
 
+    @Override
     public Message getWithoutAck(int batchSize) throws CanalClientException {
         int times = 0;
         while (times < retryTimes) {
@@ -193,6 +202,7 @@ public class ClusterCanalConnector implements CanalConnector {
         throw new CanalClientException("failed to fetch the data after " + times + " times retry");
     }
 
+    @Override
     public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
         int times = 0;
         while (times < retryTimes) {
@@ -210,6 +220,7 @@ public class ClusterCanalConnector implements CanalConnector {
         throw new CanalClientException("failed to fetch the data after " + times + " times retry");
     }
 
+    @Override
     public void rollback(long batchId) throws CanalClientException {
         int times = 0;
         while (times < retryTimes) {
@@ -227,6 +238,7 @@ public class ClusterCanalConnector implements CanalConnector {
         throw new CanalClientException("failed to rollback after " + times + " times retry");
     }
 
+    @Override
     public void rollback() throws CanalClientException {
         int times = 0;
         while (times < retryTimes) {
@@ -245,6 +257,7 @@ public class ClusterCanalConnector implements CanalConnector {
         throw new CanalClientException("failed to rollback after " + times + " times retry");
     }
 
+    @Override
     public void ack(long batchId) throws CanalClientException {
         int times = 0;
         while (times < retryTimes) {

+ 3 - 2
client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterNodeAccessStrategy.java

@@ -63,6 +63,7 @@ public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
         return nextNode();
     }
 
+    @Override
     public SocketAddress nextNode() {
         if (runningAddress != null) {// 如果服务已经启动,直接选择当前正在工作的节点
             return runningAddress;
@@ -81,7 +82,7 @@ public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
             for (String address : currentChilds) {
                 String[] strs = StringUtils.split(address, ":");
                 if (strs != null && strs.length == 2) {
-                    addresses.add(new InetSocketAddress(strs[0], Integer.valueOf(strs[1])));
+                    addresses.add(new InetSocketAddress(strs[0], Integer.parseInt(strs[1])));
                 }
             }
 
@@ -98,7 +99,7 @@ public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
         ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
         String[] strs = StringUtils.split(runningData.getAddress(), ':');
         if (strs.length == 2) {
-            runningAddress = new InetSocketAddress(strs[0], Integer.valueOf(strs[1]));
+            runningAddress = new InetSocketAddress(strs[0], Integer.parseInt(strs[1]));
         }
     }
 

+ 13 - 1
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -98,6 +98,7 @@ public class SimpleCanalConnector implements CanalConnector {
         this.clientIdentity = new ClientIdentity(destination, (short) 1001);
     }
 
+    @Override
     public void connect() throws CanalClientException {
         if (connected) {
             return;
@@ -124,6 +125,7 @@ public class SimpleCanalConnector implements CanalConnector {
         connected = true;
     }
 
+    @Override
     public void disconnect() throws CanalClientException {
         if (rollbackOnDisConnect && channel.isConnected()) {
             rollback();
@@ -222,10 +224,12 @@ public class SimpleCanalConnector implements CanalConnector {
         }
     }
 
+    @Override
     public void subscribe() throws CanalClientException {
         subscribe(""); // 传递空字符即可
     }
 
+    @Override
     public void subscribe(String filter) throws CanalClientException {
         waitClientRunning();
         if (!running) {
@@ -255,6 +259,7 @@ public class SimpleCanalConnector implements CanalConnector {
         }
     }
 
+    @Override
     public void unsubscribe() throws CanalClientException {
         waitClientRunning();
         if (!running) {
@@ -281,20 +286,24 @@ public class SimpleCanalConnector implements CanalConnector {
         }
     }
 
+    @Override
     public Message get(int batchSize) throws CanalClientException {
         return get(batchSize, null, null);
     }
 
+    @Override
     public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
         Message message = getWithoutAck(batchSize, timeout, unit);
         ack(message.getId());
         return message;
     }
 
+    @Override
     public Message getWithoutAck(int batchSize) throws CanalClientException {
         return getWithoutAck(batchSize, null, null);
     }
 
+    @Override
     public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
         waitClientRunning();
         if (!running) {
@@ -331,6 +340,7 @@ public class SimpleCanalConnector implements CanalConnector {
         return CanalMessageDeserializer.deserializer(data, lazyParseEntry);
     }
 
+    @Override
     public void ack(long batchId) throws CanalClientException {
         waitClientRunning();
         if (!running) {
@@ -352,6 +362,7 @@ public class SimpleCanalConnector implements CanalConnector {
         }
     }
 
+    @Override
     public void rollback(long batchId) throws CanalClientException {
         waitClientRunning();
         ClientRollback ca = ClientRollback.newBuilder()
@@ -370,9 +381,10 @@ public class SimpleCanalConnector implements CanalConnector {
         }
     }
 
+    @Override
     public void rollback() throws CanalClientException {
         waitClientRunning();
-        rollback(0);// 0代笔未设置
+        rollback(0); // 0 代表未设置
     }
 
     // ==================== helper method ====================

+ 1 - 0
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleNodeAccessStrategy.java

@@ -24,6 +24,7 @@ public class SimpleNodeAccessStrategy implements CanalNodeAccessStrategy {
         this.nodes.addAll(nodes);
     }
 
+    @Override
     public SocketAddress nextNode() {
         try {
             return nodes.get(index);