Răsfoiți Sursa

Merge pull request #2 from alibaba/master

fixed client idleTimeout 1 hour
rewerma 7 ani în urmă
părinte
comite
025d36604b

+ 6 - 3
client/src/main/java/com/alibaba/otter/canal/client/CanalConnectors.java

@@ -29,7 +29,8 @@ public class CanalConnectors {
     public static CanalConnector newSingleConnector(SocketAddress address, String destination, String username,
                                                     String password) {
         SimpleCanalConnector canalConnector = new SimpleCanalConnector(address, username, password, destination);
-        canalConnector.setSoTimeout(30 * 1000);
+        canalConnector.setSoTimeout(60 * 1000);
+        canalConnector.setIdleTimeout(60 * 60 * 1000);
         return canalConnector;
     }
 
@@ -48,7 +49,8 @@ public class CanalConnectors {
             password,
             destination,
             new SimpleNodeAccessStrategy(addresses));
-        canalConnector.setSoTimeout(30 * 1000);
+        canalConnector.setSoTimeout(60 * 1000);
+        canalConnector.setIdleTimeout(60 * 60 * 1000);
         return canalConnector;
     }
 
@@ -67,7 +69,8 @@ public class CanalConnectors {
             password,
             destination,
             new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers)));
-        canalConnector.setSoTimeout(30 * 1000);
+        canalConnector.setSoTimeout(60 * 1000);
+        canalConnector.setIdleTimeout(60 * 60 * 1000);
         return canalConnector;
     }
 }

+ 19 - 8
client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterCanalConnector.java

@@ -22,7 +22,8 @@ public class ClusterCanalConnector implements CanalConnector {
     private final Logger            logger        = LoggerFactory.getLogger(this.getClass());
     private String                  username;
     private String                  password;
-    private int                     soTimeout     = 10000;
+    private int                     soTimeout     = 60000;
+    private int                     idleTimeout   = 60 * 60 * 1000;
     private int                     retryTimes    = 3;                                       // 设置-1时可以subscribe阻塞等待时优雅停机
     private int                     retryInterval = 5000;                                    // 重试的时间间隔,默认5秒
     private CanalNodeAccessStrategy accessStrategy;
@@ -52,6 +53,7 @@ public class ClusterCanalConnector implements CanalConnector {
 
                     };
                     currentConnector.setSoTimeout(soTimeout);
+                    currentConnector.setIdleTimeout(idleTimeout);
                     if (filter != null) {
                         currentConnector.setFilter(filter);
                     }
@@ -110,10 +112,8 @@ public class ClusterCanalConnector implements CanalConnector {
                     logger.info("block waiting interrupted by other thread.");
                     return;
                 } else {
-                    logger.warn(String.format(
-                            "something goes wrong when subscribing from server: %s",
-                            currentConnector != null ? currentConnector.getAddress() : "null"),
-                            t);
+                    logger.warn(String.format("something goes wrong when subscribing from server: %s",
+                        currentConnector != null ? currentConnector.getAddress() : "null"), t);
                     times++;
                     restart();
                     logger.info("restart the connector for next round retry.");
@@ -218,7 +218,8 @@ public class ClusterCanalConnector implements CanalConnector {
                 return;
             } catch (Throwable t) {
                 logger.warn(String.format("something goes wrong when rollbacking data from server:%s",
-                    currentConnector.getAddress()), t);
+                    currentConnector.getAddress()),
+                    t);
                 times++;
                 restart();
                 logger.info("restart the connector for next round retry.");
@@ -235,7 +236,8 @@ public class ClusterCanalConnector implements CanalConnector {
                 return;
             } catch (Throwable t) {
                 logger.warn(String.format("something goes wrong when rollbacking data from server:%s",
-                    currentConnector.getAddress()), t);
+                    currentConnector.getAddress()),
+                    t);
                 times++;
                 restart();
                 logger.info("restart the connector for next round retry.");
@@ -253,7 +255,8 @@ public class ClusterCanalConnector implements CanalConnector {
                 return;
             } catch (Throwable t) {
                 logger.warn(String.format("something goes wrong when acking data from server:%s",
-                    currentConnector.getAddress()), t);
+                    currentConnector.getAddress()),
+                    t);
                 times++;
                 restart();
                 logger.info("restart the connector for next round retry.");
@@ -300,6 +303,14 @@ public class ClusterCanalConnector implements CanalConnector {
         this.soTimeout = soTimeout;
     }
 
+    public int getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    public void setIdleTimeout(int idleTimeout) {
+        this.idleTimeout = idleTimeout;
+    }
+
     public int getRetryTimes() {
         return retryTimes;
     }

+ 20 - 5
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -55,6 +55,7 @@ public class SimpleCanalConnector implements CanalConnector {
     private String               username;
     private String               password;
     private int                  soTimeout             = 60000;                                              // milliseconds
+    private int                  idleTimeout           = 60 * 60 * 1000;                                     // client和server之间的空闲链接超时的时间,默认为1小时
     private String               filter;                                                                     // 记录上一次的filter提交值,便于自动重试时提交
 
     private final ByteBuffer     readHeader            = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
@@ -78,15 +79,21 @@ public class SimpleCanalConnector implements CanalConnector {
     private boolean              running               = false;
 
     public SimpleCanalConnector(SocketAddress address, String username, String password, String destination){
-        this(address, username, password, destination, 60000);
+        this(address, username, password, destination, 60000, 60 * 60 * 1000);
     }
 
     public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,
                                 int soTimeout){
+        this(address, username, password, destination, soTimeout, 60 * 60 * 1000);
+    }
+
+    public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,
+                                int soTimeout, int idleTimeout){
         this.address = address;
         this.username = username;
         this.password = password;
         this.soTimeout = soTimeout;
+        this.idleTimeout = idleTimeout;
         this.clientIdentity = new ClientIdentity(destination, (short) 1001);
     }
 
@@ -157,8 +164,8 @@ public class SimpleCanalConnector implements CanalConnector {
             ClientAuth ca = ClientAuth.newBuilder()
                 .setUsername(username != null ? username : "")
                 .setPassword(ByteString.copyFromUtf8(password != null ? password : ""))
-                .setNetReadTimeout(soTimeout)
-                .setNetWriteTimeout(soTimeout)
+                .setNetReadTimeout(idleTimeout)
+                .setNetWriteTimeout(idleTimeout)
                 .build();
             writeWithHeader(Packet.newBuilder()
                 .setType(PacketType.CLIENTAUTHENTICATION)
@@ -500,6 +507,14 @@ public class SimpleCanalConnector implements CanalConnector {
         this.soTimeout = soTimeout;
     }
 
+    public int getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    public void setIdleTimeout(int idleTimeout) {
+        this.idleTimeout = idleTimeout;
+    }
+
     public void setZkClientx(ZkClientx zkClientx) {
         this.zkClientx = zkClientx;
         initClientRunningMonitor(this.clientIdentity);
@@ -519,9 +534,9 @@ public class SimpleCanalConnector implements CanalConnector {
 
     public void stopRunning() {
         if (running) {
-            running = false;  //设置为非running状态
+            running = false; // 设置为非running状态
             if (!mutex.state()) {
-                mutex.set(true);  //中断阻塞
+                mutex.set(true); // 中断阻塞
             }
         }
     }

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/ClientAuthenticationHandler.java

@@ -34,7 +34,7 @@ public class ClientAuthenticationHandler extends SimpleChannelHandler {
 
     private static final Logger     logger                                  = LoggerFactory.getLogger(ClientAuthenticationHandler.class);
     private final int               SUPPORTED_VERSION                       = 3;
-    private final int               defaultSubscriptorDisconnectIdleTimeout = 5 * 60 * 1000;
+    private final int               defaultSubscriptorDisconnectIdleTimeout = 60 * 60 * 1000;
     private CanalServerWithEmbedded embeddedServer;
 
     public ClientAuthenticationHandler(){