Bladeren bron

Merge pull request #319 from day6/fixbug-clientTimeout

fixbug: soTimeout doesn't work.
agapple 7 jaren geleden
bovenliggende
commit
b265bf4669
1 gewijzigde bestanden met toevoegingen van 103 en 80 verwijderingen
  1. 103 80
      client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

+ 103 - 80
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -5,7 +5,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.channels.SocketChannel;
+import java.nio.channels.*;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -37,7 +37,6 @@ import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * 基于{@linkplain CanalServerWithNetty}定义的网络协议接口,对于canal数据进行get/rollback/ack等操作
@@ -57,6 +56,8 @@ public class SimpleCanalConnector implements CanalConnector {
     private final ByteBuffer     readHeader            = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
     private final ByteBuffer     writeHeader           = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
     private SocketChannel        channel;
+    private ReadableByteChannel  readableChannel;
+    private WritableByteChannel  writableChannel;
     private List<Compression>    supportedCompressions = new ArrayList<Compression>();
     private ClientIdentity       clientIdentity;
     private ClientRunningMonitor runningMonitor;                                                             // 运行控制
@@ -130,7 +131,9 @@ public class SimpleCanalConnector implements CanalConnector {
                 address = getNextAddress();
             }
             channel.connect(address);
-            Packet p = Packet.parseFrom(readNextPacket(channel));
+            readableChannel = Channels.newChannel(channel.socket().getInputStream());
+            writableChannel = Channels.newChannel(channel.socket().getOutputStream());
+            Packet p = Packet.parseFrom(readNextPacket());
             if (p.getVersion() != 1) {
                 throw new CanalClientException("unsupported version at this client.");
             }
@@ -143,18 +146,19 @@ public class SimpleCanalConnector implements CanalConnector {
             supportedCompressions.addAll(handshake.getSupportedCompressionsList());
             //
             ClientAuth ca = ClientAuth.newBuilder()
-                .setUsername(username != null ? username : "")
-                .setNetReadTimeout(soTimeout)
-                .setNetWriteTimeout(soTimeout)
-                .build();
-            writeWithHeader(channel,
-                Packet.newBuilder()
-                    .setType(PacketType.CLIENTAUTHENTICATION)
-                    .setBody(ca.toByteString())
-                    .build()
-                    .toByteArray());
+                    .setUsername(username != null ? username : "")
+                    .setPassword(ByteString.copyFromUtf8(password != null ? password : ""))
+                    .setNetReadTimeout(soTimeout)
+                    .setNetWriteTimeout(soTimeout)
+                    .build();
+            writeWithHeader(
+                    Packet.newBuilder()
+                            .setType(PacketType.CLIENTAUTHENTICATION)
+                            .setBody(ca.toByteString())
+                            .build()
+                            .toByteArray());
             //
-            Packet ack = Packet.parseFrom(readNextPacket(channel));
+            Packet ack = Packet.parseFrom(readNextPacket());
             if (ack.getType() != PacketType.ACK) {
                 throw new CanalClientException("unexpected packet type when ack is expected");
             }
@@ -173,16 +177,28 @@ public class SimpleCanalConnector implements CanalConnector {
     }
 
     private void doDisconnnect() throws CanalClientException {
+        if (readableChannel != null) {
+            quietlyClose(readableChannel);
+            readableChannel = null;
+        }
+        if (writableChannel != null) {
+            quietlyClose(writableChannel);
+            writableChannel = null;
+        }
         if (channel != null) {
-            try {
-                channel.close();
-            } catch (IOException e) {
-                logger.warn("exception on closing channel:{} \n {}", channel, e);
-            }
+            quietlyClose(channel);
             channel = null;
         }
     }
 
+    private void quietlyClose(Channel channel) {
+        try {
+            channel.close();
+        } catch (IOException e) {
+            logger.warn("exception on closing channel:{} \n {}", channel, e);
+        }
+    }
+
     public void subscribe() throws CanalClientException {
         subscribe(""); // 传递空字符即可
     }
@@ -190,19 +206,19 @@ public class SimpleCanalConnector implements CanalConnector {
     public void subscribe(String filter) throws CanalClientException {
         waitClientRunning();
         try {
-            writeWithHeader(channel,
-                Packet.newBuilder()
-                    .setType(PacketType.SUBSCRIPTION)
-                    .setBody(Sub.newBuilder()
-                        .setDestination(clientIdentity.getDestination())
-                        .setClientId(String.valueOf(clientIdentity.getClientId()))
-                        .setFilter(filter != null ? filter : "")
-                        .build()
-                        .toByteString())
-                    .build()
-                    .toByteArray());
+            writeWithHeader(
+                    Packet.newBuilder()
+                            .setType(PacketType.SUBSCRIPTION)
+                            .setBody(Sub.newBuilder()
+                                    .setDestination(clientIdentity.getDestination())
+                                    .setClientId(String.valueOf(clientIdentity.getClientId()))
+                                    .setFilter(filter != null ? filter : "")
+                                    .build()
+                                    .toByteString())
+                            .build()
+                            .toByteArray());
             //
-            Packet p = Packet.parseFrom(readNextPacket(channel));
+            Packet p = Packet.parseFrom(readNextPacket());
             Ack ack = Ack.parseFrom(p.getBody());
             if (ack.getErrorCode() > 0) {
                 throw new CanalClientException("failed to subscribe with reason: " + ack.getErrorMessage());
@@ -217,18 +233,18 @@ public class SimpleCanalConnector implements CanalConnector {
     public void unsubscribe() throws CanalClientException {
         waitClientRunning();
         try {
-            writeWithHeader(channel,
-                Packet.newBuilder()
-                    .setType(PacketType.UNSUBSCRIPTION)
-                    .setBody(Unsub.newBuilder()
-                        .setDestination(clientIdentity.getDestination())
-                        .setClientId(String.valueOf(clientIdentity.getClientId()))
-                        .build()
-                        .toByteString())
-                    .build()
-                    .toByteArray());
+            writeWithHeader(
+                    Packet.newBuilder()
+                            .setType(PacketType.UNSUBSCRIPTION)
+                            .setBody(Unsub.newBuilder()
+                                    .setDestination(clientIdentity.getDestination())
+                                    .setClientId(String.valueOf(clientIdentity.getClientId()))
+                                    .build()
+                                    .toByteString())
+                            .build()
+                            .toByteArray());
             //
-            Packet p = Packet.parseFrom(readNextPacket(channel));
+            Packet p = Packet.parseFrom(readNextPacket());
             Ack ack = Ack.parseFrom(p.getBody());
             if (ack.getErrorCode() > 0) {
                 throw new CanalClientException("failed to unSubscribe with reason: " + ack.getErrorMessage());
@@ -261,29 +277,28 @@ public class SimpleCanalConnector implements CanalConnector {
                 unit = TimeUnit.MILLISECONDS;
             }
 
-            writeWithHeader(channel,
-                Packet.newBuilder()
-                    .setType(PacketType.GET)
-                    .setBody(Get.newBuilder()
-                        .setAutoAck(false)
-                        .setDestination(clientIdentity.getDestination())
-                        .setClientId(String.valueOf(clientIdentity.getClientId()))
-                        .setFetchSize(size)
-                        .setTimeout(time)
-                        .setUnit(unit.ordinal())
-                        .build()
-                        .toByteString())
-                    .build()
-                    .toByteArray());
-
+            writeWithHeader(
+                    Packet.newBuilder()
+                            .setType(PacketType.GET)
+                            .setBody(Get.newBuilder()
+                                    .setAutoAck(false)
+                                    .setDestination(clientIdentity.getDestination())
+                                    .setClientId(String.valueOf(clientIdentity.getClientId()))
+                                    .setFetchSize(size)
+                                    .setTimeout(time)
+                                    .setUnit(unit.ordinal())
+                                    .build()
+                                    .toByteString())
+                            .build()
+                            .toByteArray());
             return receiveMessages();
         } catch (IOException e) {
             throw new CanalClientException(e);
         }
     }
 
-    private Message receiveMessages() throws InvalidProtocolBufferException, IOException {
-        Packet p = Packet.parseFrom(readNextPacket(channel));
+    private Message receiveMessages() throws IOException {
+        Packet p = Packet.parseFrom(readNextPacket());
         switch (p.getType()) {
             case MESSAGES: {
                 if (!p.getCompression().equals(Compression.NONE)) {
@@ -310,16 +325,16 @@ public class SimpleCanalConnector implements CanalConnector {
     public void ack(long batchId) throws CanalClientException {
         waitClientRunning();
         ClientAck ca = ClientAck.newBuilder()
-            .setDestination(clientIdentity.getDestination())
-            .setClientId(String.valueOf(clientIdentity.getClientId()))
-            .setBatchId(batchId)
-            .build();
+                .setDestination(clientIdentity.getDestination())
+                .setClientId(String.valueOf(clientIdentity.getClientId()))
+                .setBatchId(batchId)
+                .build();
         try {
-            writeWithHeader(channel, Packet.newBuilder()
-                .setType(PacketType.CLIENTACK)
-                .setBody(ca.toByteString())
-                .build()
-                .toByteArray());
+            writeWithHeader(Packet.newBuilder()
+                    .setType(PacketType.CLIENTACK)
+                    .setBody(ca.toByteString())
+                    .build()
+                    .toByteArray());
         } catch (IOException e) {
             throw new CanalClientException(e);
         }
@@ -328,16 +343,16 @@ public class SimpleCanalConnector implements CanalConnector {
     public void rollback(long batchId) throws CanalClientException {
         waitClientRunning();
         ClientRollback ca = ClientRollback.newBuilder()
-            .setDestination(clientIdentity.getDestination())
-            .setClientId(String.valueOf(clientIdentity.getClientId()))
-            .setBatchId(batchId)
-            .build();
+                .setDestination(clientIdentity.getDestination())
+                .setClientId(String.valueOf(clientIdentity.getClientId()))
+                .setBatchId(batchId)
+                .build();
         try {
-            writeWithHeader(channel, Packet.newBuilder()
-                .setType(PacketType.CLIENTROLLBACK)
-                .setBody(ca.toByteString())
-                .build()
-                .toByteArray());
+            writeWithHeader(Packet.newBuilder()
+                    .setType(PacketType.CLIENTROLLBACK)
+                    .setBody(ca.toByteString())
+                    .build()
+                    .toByteArray());
         } catch (IOException e) {
             throw new CanalClientException(e);
         }
@@ -350,7 +365,15 @@ public class SimpleCanalConnector implements CanalConnector {
 
     // ==================== helper method ====================
 
-    private void writeWithHeader(SocketChannel channel, byte[] body) throws IOException {
+    private void writeWithHeader(byte[] body) throws IOException {
+        writeWithHeader(writableChannel, body);
+    }
+
+    private byte[] readNextPacket() throws IOException {
+        return readNextPacket(readableChannel);
+    }
+
+    private void writeWithHeader(WritableByteChannel channel, byte[] body) throws IOException {
         synchronized (writeDataLock) {
             writeHeader.clear();
             writeHeader.putInt(body.length);
@@ -360,7 +383,7 @@ public class SimpleCanalConnector implements CanalConnector {
         }
     }
 
-    private byte[] readNextPacket(SocketChannel channel) throws IOException {
+    private byte[] readNextPacket(ReadableByteChannel channel) throws IOException {
         synchronized (readDataLock) {
             readHeader.clear();
             read(channel, readHeader);
@@ -371,7 +394,7 @@ public class SimpleCanalConnector implements CanalConnector {
         }
     }
 
-    private void read(SocketChannel channel, ByteBuffer buffer) throws IOException {
+    private void read(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
         while (buffer.hasRemaining()) {
             int r = channel.read(buffer);
             if (r == -1) {