|
@@ -5,7 +5,11 @@ import java.net.InetSocketAddress;
|
|
|
import java.net.SocketAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.ByteOrder;
|
|
|
-import java.nio.channels.*;
|
|
|
+import java.nio.channels.Channel;
|
|
|
+import java.nio.channels.Channels;
|
|
|
+import java.nio.channels.ReadableByteChannel;
|
|
|
+import java.nio.channels.SocketChannel;
|
|
|
+import java.nio.channels.WritableByteChannel;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -146,17 +150,16 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
supportedCompressions.addAll(handshake.getSupportedCompressionsList());
|
|
|
//
|
|
|
ClientAuth ca = ClientAuth.newBuilder()
|
|
|
- .setUsername(username != null ? username : "")
|
|
|
- .setPassword(ByteString.copyFromUtf8(password != null ? password : ""))
|
|
|
- .setNetReadTimeout(soTimeout)
|
|
|
- .setNetWriteTimeout(soTimeout)
|
|
|
- .build();
|
|
|
- writeWithHeader(
|
|
|
- Packet.newBuilder()
|
|
|
- .setType(PacketType.CLIENTAUTHENTICATION)
|
|
|
- .setBody(ca.toByteString())
|
|
|
- .build()
|
|
|
- .toByteArray());
|
|
|
+ .setUsername(username != null ? username : "")
|
|
|
+ .setPassword(ByteString.copyFromUtf8(password != null ? password : ""))
|
|
|
+ .setNetReadTimeout(soTimeout)
|
|
|
+ .setNetWriteTimeout(soTimeout)
|
|
|
+ .build();
|
|
|
+ writeWithHeader(Packet.newBuilder()
|
|
|
+ .setType(PacketType.CLIENTAUTHENTICATION)
|
|
|
+ .setBody(ca.toByteString())
|
|
|
+ .build()
|
|
|
+ .toByteArray());
|
|
|
//
|
|
|
Packet ack = Packet.parseFrom(readNextPacket());
|
|
|
if (ack.getType() != PacketType.ACK) {
|
|
@@ -206,17 +209,16 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
public void subscribe(String filter) throws CanalClientException {
|
|
|
waitClientRunning();
|
|
|
try {
|
|
|
- writeWithHeader(
|
|
|
- Packet.newBuilder()
|
|
|
- .setType(PacketType.SUBSCRIPTION)
|
|
|
- .setBody(Sub.newBuilder()
|
|
|
- .setDestination(clientIdentity.getDestination())
|
|
|
- .setClientId(String.valueOf(clientIdentity.getClientId()))
|
|
|
- .setFilter(filter != null ? filter : "")
|
|
|
- .build()
|
|
|
- .toByteString())
|
|
|
- .build()
|
|
|
- .toByteArray());
|
|
|
+ writeWithHeader(Packet.newBuilder()
|
|
|
+ .setType(PacketType.SUBSCRIPTION)
|
|
|
+ .setBody(Sub.newBuilder()
|
|
|
+ .setDestination(clientIdentity.getDestination())
|
|
|
+ .setClientId(String.valueOf(clientIdentity.getClientId()))
|
|
|
+ .setFilter(filter != null ? filter : "")
|
|
|
+ .build()
|
|
|
+ .toByteString())
|
|
|
+ .build()
|
|
|
+ .toByteArray());
|
|
|
//
|
|
|
Packet p = Packet.parseFrom(readNextPacket());
|
|
|
Ack ack = Ack.parseFrom(p.getBody());
|
|
@@ -233,16 +235,15 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
public void unsubscribe() throws CanalClientException {
|
|
|
waitClientRunning();
|
|
|
try {
|
|
|
- writeWithHeader(
|
|
|
- Packet.newBuilder()
|
|
|
- .setType(PacketType.UNSUBSCRIPTION)
|
|
|
- .setBody(Unsub.newBuilder()
|
|
|
- .setDestination(clientIdentity.getDestination())
|
|
|
- .setClientId(String.valueOf(clientIdentity.getClientId()))
|
|
|
- .build()
|
|
|
- .toByteString())
|
|
|
- .build()
|
|
|
- .toByteArray());
|
|
|
+ writeWithHeader(Packet.newBuilder()
|
|
|
+ .setType(PacketType.UNSUBSCRIPTION)
|
|
|
+ .setBody(Unsub.newBuilder()
|
|
|
+ .setDestination(clientIdentity.getDestination())
|
|
|
+ .setClientId(String.valueOf(clientIdentity.getClientId()))
|
|
|
+ .build()
|
|
|
+ .toByteString())
|
|
|
+ .build()
|
|
|
+ .toByteArray());
|
|
|
//
|
|
|
Packet p = Packet.parseFrom(readNextPacket());
|
|
|
Ack ack = Ack.parseFrom(p.getBody());
|
|
@@ -277,20 +278,19 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
unit = TimeUnit.MILLISECONDS;
|
|
|
}
|
|
|
|
|
|
- writeWithHeader(
|
|
|
- Packet.newBuilder()
|
|
|
- .setType(PacketType.GET)
|
|
|
- .setBody(Get.newBuilder()
|
|
|
- .setAutoAck(false)
|
|
|
- .setDestination(clientIdentity.getDestination())
|
|
|
- .setClientId(String.valueOf(clientIdentity.getClientId()))
|
|
|
- .setFetchSize(size)
|
|
|
- .setTimeout(time)
|
|
|
- .setUnit(unit.ordinal())
|
|
|
- .build()
|
|
|
- .toByteString())
|
|
|
- .build()
|
|
|
- .toByteArray());
|
|
|
+ writeWithHeader(Packet.newBuilder()
|
|
|
+ .setType(PacketType.GET)
|
|
|
+ .setBody(Get.newBuilder()
|
|
|
+ .setAutoAck(false)
|
|
|
+ .setDestination(clientIdentity.getDestination())
|
|
|
+ .setClientId(String.valueOf(clientIdentity.getClientId()))
|
|
|
+ .setFetchSize(size)
|
|
|
+ .setTimeout(time)
|
|
|
+ .setUnit(unit.ordinal())
|
|
|
+ .build()
|
|
|
+ .toByteString())
|
|
|
+ .build()
|
|
|
+ .toByteArray());
|
|
|
return receiveMessages();
|
|
|
} catch (IOException e) {
|
|
|
throw new CanalClientException(e);
|
|
@@ -325,16 +325,16 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
public void ack(long batchId) throws CanalClientException {
|
|
|
waitClientRunning();
|
|
|
ClientAck ca = ClientAck.newBuilder()
|
|
|
- .setDestination(clientIdentity.getDestination())
|
|
|
- .setClientId(String.valueOf(clientIdentity.getClientId()))
|
|
|
- .setBatchId(batchId)
|
|
|
- .build();
|
|
|
+ .setDestination(clientIdentity.getDestination())
|
|
|
+ .setClientId(String.valueOf(clientIdentity.getClientId()))
|
|
|
+ .setBatchId(batchId)
|
|
|
+ .build();
|
|
|
try {
|
|
|
writeWithHeader(Packet.newBuilder()
|
|
|
- .setType(PacketType.CLIENTACK)
|
|
|
- .setBody(ca.toByteString())
|
|
|
- .build()
|
|
|
- .toByteArray());
|
|
|
+ .setType(PacketType.CLIENTACK)
|
|
|
+ .setBody(ca.toByteString())
|
|
|
+ .build()
|
|
|
+ .toByteArray());
|
|
|
} catch (IOException e) {
|
|
|
throw new CanalClientException(e);
|
|
|
}
|
|
@@ -343,16 +343,16 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
public void rollback(long batchId) throws CanalClientException {
|
|
|
waitClientRunning();
|
|
|
ClientRollback ca = ClientRollback.newBuilder()
|
|
|
- .setDestination(clientIdentity.getDestination())
|
|
|
- .setClientId(String.valueOf(clientIdentity.getClientId()))
|
|
|
- .setBatchId(batchId)
|
|
|
- .build();
|
|
|
+ .setDestination(clientIdentity.getDestination())
|
|
|
+ .setClientId(String.valueOf(clientIdentity.getClientId()))
|
|
|
+ .setBatchId(batchId)
|
|
|
+ .build();
|
|
|
try {
|
|
|
writeWithHeader(Packet.newBuilder()
|
|
|
- .setType(PacketType.CLIENTROLLBACK)
|
|
|
- .setBody(ca.toByteString())
|
|
|
- .build()
|
|
|
- .toByteArray());
|
|
|
+ .setType(PacketType.CLIENTROLLBACK)
|
|
|
+ .setBody(ca.toByteString())
|
|
|
+ .build()
|
|
|
+ .toByteArray());
|
|
|
} catch (IOException e) {
|
|
|
throw new CanalClientException(e);
|
|
|
}
|