|
@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.server.netty.handler;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import com.alibaba.otter.canal.server.netty.listener.ChannelFutureAggregator;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.commons.lang.exception.ExceptionUtils;
|
|
|
import org.jboss.netty.buffer.ChannelBuffer;
|
|
@@ -51,6 +52,7 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
|
|
|
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
|
|
logger.info("message receives in session handler...");
|
|
|
+ long start = System.nanoTime();
|
|
|
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
|
|
|
Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
|
|
|
ClientIdentity clientIdentity = null;
|
|
@@ -74,12 +76,13 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
|
|
|
embeddedServer.subscribe(clientIdentity);
|
|
|
// ctx.setAttachment(clientIdentity);// 设置状态数据
|
|
|
- NettyUtils.ack(ctx.getChannel(), null);
|
|
|
+ byte[] ackBytes = NettyUtils.ackPacket();
|
|
|
+ NettyUtils.write(ctx.getChannel(), ackBytes, new ChannelFutureAggregator(sub.getDestination(),
|
|
|
+ sub, packet.getType(), ackBytes.length, System.nanoTime() - start));
|
|
|
} else {
|
|
|
- NettyUtils.error(401,
|
|
|
- MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage(),
|
|
|
- ctx.getChannel(),
|
|
|
- null);
|
|
|
+ byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage());
|
|
|
+ NettyUtils.write(ctx.getChannel(), errorBytes ,new ChannelFutureAggregator(sub.getDestination(),
|
|
|
+ sub, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 401));
|
|
|
}
|
|
|
break;
|
|
|
case UNSUBSCRIPTION:
|
|
@@ -91,12 +94,13 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
MDC.put("destination", clientIdentity.getDestination());
|
|
|
embeddedServer.unsubscribe(clientIdentity);
|
|
|
stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭
|
|
|
- NettyUtils.ack(ctx.getChannel(), null);
|
|
|
+ byte[] ackBytes = NettyUtils.ackPacket();
|
|
|
+ NettyUtils.write(ctx.getChannel(), ackBytes, new ChannelFutureAggregator(unsub.getDestination(),
|
|
|
+ unsub, packet.getType(), ackBytes.length, System.nanoTime() - start));
|
|
|
} else {
|
|
|
- NettyUtils.error(401,
|
|
|
- MessageFormatter.format("destination or clientId is null", unsub.toString()).getMessage(),
|
|
|
- ctx.getChannel(),
|
|
|
- null);
|
|
|
+ byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", unsub.toString()).getMessage());
|
|
|
+ NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(unsub.getDestination(),
|
|
|
+ unsub, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 401));
|
|
|
}
|
|
|
break;
|
|
|
case GET:
|
|
@@ -171,7 +175,8 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
output.writeBytes(2, rowEntries.get(i));
|
|
|
}
|
|
|
output.checkNoSpaceLeft();
|
|
|
- NettyUtils.write(ctx.getChannel(), body, null);
|
|
|
+ NettyUtils.write(ctx.getChannel(), body, new ChannelFutureAggregator(get.getDestination(),
|
|
|
+ get, packet.getType(), body.length, System.nanoTime() - start, message.getId() == -1));
|
|
|
|
|
|
// output.flush();
|
|
|
// byteBuffer.flip();
|
|
@@ -192,14 +197,14 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- packetBuilder.setBody(messageBuilder.build().toByteString());
|
|
|
- NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);// 输出数据
|
|
|
+ byte[] body = packetBuilder.setBody(messageBuilder.build().toByteString()).build().toByteArray();
|
|
|
+ NettyUtils.write(ctx.getChannel(), body, new ChannelFutureAggregator(get.getDestination(),
|
|
|
+ get, packet.getType(), body.length, System.nanoTime() - start, message.getId() == -1));// 输出数据
|
|
|
}
|
|
|
} else {
|
|
|
- NettyUtils.error(401,
|
|
|
- MessageFormatter.format("destination or clientId is null", get.toString()).getMessage(),
|
|
|
- ctx.getChannel(),
|
|
|
- null);
|
|
|
+ byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", get.toString()).getMessage());
|
|
|
+ NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(get.getDestination(),
|
|
|
+ get, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 401));
|
|
|
}
|
|
|
break;
|
|
|
case CLIENTACK:
|
|
@@ -207,10 +212,9 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
MDC.put("destination", ack.getDestination());
|
|
|
if (StringUtils.isNotEmpty(ack.getDestination()) && StringUtils.isNotEmpty(ack.getClientId())) {
|
|
|
if (ack.getBatchId() == 0L) {
|
|
|
- NettyUtils.error(402,
|
|
|
- MessageFormatter.format("batchId should assign value", ack.toString()).getMessage(),
|
|
|
- ctx.getChannel(),
|
|
|
- null);
|
|
|
+ byte[] errorBytes = NettyUtils.errorPacket(402, MessageFormatter.format("batchId should assign value", ack.toString()).getMessage());
|
|
|
+ NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ack.getDestination(),
|
|
|
+ ack, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 402));
|
|
|
} else if (ack.getBatchId() == -1L) { // -1代表上一次get没有数据,直接忽略之
|
|
|
// donothing
|
|
|
} else {
|
|
@@ -218,10 +222,9 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
embeddedServer.ack(clientIdentity, ack.getBatchId());
|
|
|
}
|
|
|
} else {
|
|
|
- NettyUtils.error(401,
|
|
|
- MessageFormatter.format("destination or clientId is null", ack.toString()).getMessage(),
|
|
|
- ctx.getChannel(),
|
|
|
- null);
|
|
|
+ byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", ack.toString()).getMessage());
|
|
|
+ NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ack.getDestination(),
|
|
|
+ ack, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 401));
|
|
|
}
|
|
|
break;
|
|
|
case CLIENTROLLBACK:
|
|
@@ -237,25 +240,23 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
|
|
|
}
|
|
|
} else {
|
|
|
- NettyUtils.error(401,
|
|
|
- MessageFormatter.format("destination or clientId is null", rollback.toString())
|
|
|
- .getMessage(),
|
|
|
- ctx.getChannel(),
|
|
|
- null);
|
|
|
+ byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", rollback.toString()).getMessage());
|
|
|
+ NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(rollback.getDestination(),
|
|
|
+ rollback, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 401));
|
|
|
}
|
|
|
break;
|
|
|
default:
|
|
|
- NettyUtils.error(400, MessageFormatter.format("packet type={} is NOT supported!", packet.getType())
|
|
|
- .getMessage(), ctx.getChannel(), null);
|
|
|
+ byte[] errorBytes = NettyUtils.errorPacket(400, MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage());
|
|
|
+ NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ctx.getChannel().getRemoteAddress().toString(),
|
|
|
+ null, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 400));
|
|
|
break;
|
|
|
}
|
|
|
} catch (Throwable exception) {
|
|
|
- NettyUtils.error(400,
|
|
|
- MessageFormatter.format("something goes wrong with channel:{}, exception={}",
|
|
|
+ byte[] errorBytes = NettyUtils.errorPacket(400, MessageFormatter.format("something goes wrong with channel:{}, exception={}",
|
|
|
ctx.getChannel(),
|
|
|
- ExceptionUtils.getStackTrace(exception)).getMessage(),
|
|
|
- ctx.getChannel(),
|
|
|
- null);
|
|
|
+ ExceptionUtils.getStackTrace(exception)).getMessage());
|
|
|
+ NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ctx.getChannel().getRemoteAddress().toString(),
|
|
|
+ null, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 400));
|
|
|
} finally {
|
|
|
MDC.remove("destination");
|
|
|
}
|