소스 검색

Add listener to write.

Chuanyi Li L 6 년 전
부모
커밋
680faea2bb

+ 3 - 3
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/CanalInstanceExports.java

@@ -21,9 +21,9 @@ import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.
 public class CanalInstanceExports {
 
     private static final Logger      logger          = LoggerFactory.getLogger(CanalInstanceExports.class);
-    public static final String       DESTINATION     = "destination";
-    public static final String[]     DEST_LABELS     = {DESTINATION};
-    public static final List<String> DEST_LABEL_LIST = Collections.singletonList(DESTINATION);
+    public static final String       DEST            = "destination";
+    public static final String[]     DEST_LABELS     = {DEST};
+    public static final List<String> DEST_LABEL_LIST = Collections.singletonList(DEST);
     private final String             destination;
     private Collector                storeCollector;
     private Collector                delayCollector;

+ 2 - 5
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java

@@ -22,11 +22,8 @@ import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.
 public class PrometheusService implements CanalMetricsService {
 
     private static final Logger                           logger  = LoggerFactory.getLogger(PrometheusService.class);
-
     private final Map<String, CanalInstanceExports>       exports = new ConcurrentHashMap<String, CanalInstanceExports>();
-
     private volatile boolean                              running = false;
-
     private HTTPServer                                    server;
 
     private PrometheusService() {
@@ -69,8 +66,8 @@ public class PrometheusService implements CanalMetricsService {
         // Normally, service should be terminated at canal shutdown.
         // No need to unregister instance exports explicitly.
         // But for the sake of safety, unregister them.
-        for (CanalInstanceExports ie : exports.values()) {
-            ie.unregister();
+        for (CanalInstanceExports cie : exports.values()) {
+            cie.unregister();
         }
         profiler().setInstanceProfilerFactory(DISABLED);
         if (server != null) {

+ 2 - 2
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/MysqlParserCollector.java

@@ -12,7 +12,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DESTINATION;
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABEL_LIST;
 
 /**
@@ -45,7 +45,7 @@ public class MysqlParserCollector extends Collector {
         this.receivedBinlogBytesHelp = "Received binlog bytes of instance" + destination;
         this.isParallel = ((MysqlEventParser)parser).isParallel();
         this.modeHelp = "Parser mode of instance" + destination;
-        this.modeLabels = Arrays.asList(DESTINATION, MODE_LABEL);
+        this.modeLabels = Arrays.asList(DEST, MODE_LABEL);
         this.modeLabelValues = Arrays.asList(destination, isParallel.toString());
     }
 

+ 3 - 3
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusClientInstanceProfiler.java

@@ -7,7 +7,7 @@ import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Histogram;
 
-import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DESTINATION;
+import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS;
 
 /**
@@ -38,7 +38,7 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
                 .help("Send bytes to client of instance " + destination)
                 .create();
         this.packetsCounter = Counter.build()
-                .labelNames(new String[]{DESTINATION, "packetType"})
+                .labelNames(new String[]{DEST, "packetType"})
                 .name(PACKET_TYPE)
                 .help("Send packets to client of instance " + destination)
                 .create();
@@ -48,7 +48,7 @@ public class PrometheusClientInstanceProfiler implements ClientInstanceProfiler
                 .help("Send empty batches to client of instance " + destination)
                 .create();
         this.errorsCounter = Counter.build()
-                .labelNames(new String[]{DESTINATION, "errorCode"})
+                .labelNames(new String[]{DEST, "errorCode"})
                 .name(ERRORS)
                 .help("Client request errors of instance " + destination)
                 .create();

+ 6 - 6
server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNettyProfiler.java

@@ -11,7 +11,7 @@ import java.util.concurrent.ConcurrentMap;
 public class CanalServerWithNettyProfiler {
 
     public static final ClientInstanceProfilerFactory           DISABLED = new DefaultClientInstanceProfilerFactory();
-    private volatile ClientInstanceProfilerFactory              factory;
+    private volatile ClientInstanceProfilerFactory              clientInstanceProfilerFactory;
     private final ConcurrentMap<String, ClientInstanceProfiler> cliPfs;
     private final CanalServerWithEmbedded                       server;
 
@@ -20,7 +20,7 @@ public class CanalServerWithNettyProfiler {
     }
 
     private CanalServerWithNettyProfiler() {
-        this.factory = DISABLED;
+        this.clientInstanceProfilerFactory = DISABLED;
         this.cliPfs = new ConcurrentHashMap<String, ClientInstanceProfiler>();
         this.server = CanalServerWithEmbedded.instance();
     }
@@ -46,7 +46,7 @@ public class CanalServerWithNettyProfiler {
         if (server.isStart(destination)) {
             throw new IllegalStateException("Instance profiler should not be start while running.");
         }
-        ClientInstanceProfiler profiler = factory.create(destination);
+        ClientInstanceProfiler profiler = clientInstanceProfilerFactory.create(destination);
         profiler.start();
         cliPfs.put(destination, profiler);
     }
@@ -70,11 +70,11 @@ public class CanalServerWithNettyProfiler {
     }
 
     public void setInstanceProfilerFactory(ClientInstanceProfilerFactory factory) {
-        this.factory = factory;
+        this.clientInstanceProfilerFactory = factory;
     }
 
     private boolean isDisabled() {
-        return factory == DISABLED || factory == null;
+        return clientInstanceProfilerFactory == DISABLED || clientInstanceProfilerFactory == null;
     }
 
     private ClientInstanceProfiler tryGet(String destination) {
@@ -84,7 +84,7 @@ public class CanalServerWithNettyProfiler {
             synchronized (cliPfs) {
                 if (server.isStart(destination)) {
                     // avoid overwriting
-                    cliPfs.putIfAbsent(destination, factory.create(destination));
+                    cliPfs.putIfAbsent(destination, clientInstanceProfilerFactory.create(destination));
                     profiler = cliPfs.get(destination);
                     if (!profiler.isStart()) {
                         profiler.start();

+ 16 - 0
server/src/main/java/com/alibaba/otter/canal/server/netty/NettyUtils.java

@@ -74,4 +74,20 @@ public class NettyUtils {
                 .toByteArray(),
             channelFutureListener);
     }
+
+    public static byte[] ackPacket() {
+        return Packet.newBuilder()
+                .setType(CanalPacket.PacketType.ACK)
+                .setBody(Ack.newBuilder().build().toByteString())
+                .build()
+                .toByteArray();
+    }
+
+    public static byte[] errorPacket(int errorCode, String errorMessage) {
+        return Packet.newBuilder()
+                .setType(CanalPacket.PacketType.ACK)
+                .setBody(Ack.newBuilder().setErrorCode(errorCode).setErrorMessage(errorMessage).build().toByteString())
+                .build()
+                .toByteArray();
+    }
 }

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/ClientAuthenticationHandler.java

@@ -73,7 +73,7 @@ public class ClientAuthenticationHandler extends SimpleChannelHandler {
                         MDC.remove("destination");
                     }
                 }
-
+                // 鉴权一次性,暂不统计
                 NettyUtils.ack(ctx.getChannel(), new ChannelFutureListener() {
 
                     public void operationComplete(ChannelFuture future) throws Exception {

+ 37 - 38
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -52,7 +52,7 @@ public class SessionHandler extends SimpleChannelHandler {
 
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
         logger.info("message receives in session handler...");
-        long start = System.currentTimeMillis();
+        long start = System.nanoTime();
         ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
         Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
         ClientIdentity clientIdentity = null;
@@ -76,12 +76,13 @@ public class SessionHandler extends SimpleChannelHandler {
 
                         embeddedServer.subscribe(clientIdentity);
                         // ctx.setAttachment(clientIdentity);// 设置状态数据
-                        NettyUtils.ack(ctx.getChannel(), null);
+                        byte[] ackBytes = NettyUtils.ackPacket();
+                        NettyUtils.write(ctx.getChannel(), ackBytes, new ChannelFutureAggregator(sub.getDestination(),
+                                sub, packet.getType(), ackBytes.length, System.nanoTime() - start));
                     } else {
-                        NettyUtils.error(401,
-                            MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage(),
-                            ctx.getChannel(),
-                            null);
+                        byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage());
+                        NettyUtils.write(ctx.getChannel(), errorBytes ,new ChannelFutureAggregator(sub.getDestination(),
+                                sub, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 401));
                     }
                     break;
                 case UNSUBSCRIPTION:
@@ -93,12 +94,13 @@ public class SessionHandler extends SimpleChannelHandler {
                         MDC.put("destination", clientIdentity.getDestination());
                         embeddedServer.unsubscribe(clientIdentity);
                         stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭
-                        NettyUtils.ack(ctx.getChannel(), null);
+                        byte[] ackBytes = NettyUtils.ackPacket();
+                        NettyUtils.write(ctx.getChannel(), ackBytes, new ChannelFutureAggregator(unsub.getDestination(),
+                                unsub, packet.getType(), ackBytes.length, System.nanoTime() - start));
                     } else {
-                        NettyUtils.error(401,
-                            MessageFormatter.format("destination or clientId is null", unsub.toString()).getMessage(),
-                            ctx.getChannel(),
-                            null);
+                        byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", unsub.toString()).getMessage());
+                        NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(unsub.getDestination(),
+                                unsub, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 401));
                     }
                     break;
                 case GET:
@@ -173,7 +175,8 @@ public class SessionHandler extends SimpleChannelHandler {
                                 output.writeBytes(2, rowEntries.get(i));
                             }
                             output.checkNoSpaceLeft();
-                            NettyUtils.write(ctx.getChannel(), body, null);
+                            NettyUtils.write(ctx.getChannel(), body, new ChannelFutureAggregator(get.getDestination(),
+                                    get, packet.getType(), body.length, System.nanoTime() - start, message.getId() == -1));
                             
                             // output.flush();
                             // byteBuffer.flip();
@@ -194,14 +197,14 @@ public class SessionHandler extends SimpleChannelHandler {
                                     }
                                 }
                             }
-                            packetBuilder.setBody(messageBuilder.build().toByteString());
-                            NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);// 输出数据
+                            byte[] body = packetBuilder.setBody(messageBuilder.build().toByteString()).build().toByteArray();
+                            NettyUtils.write(ctx.getChannel(), body, new ChannelFutureAggregator(get.getDestination(),
+                                    get, packet.getType(), body.length, System.nanoTime() - start, message.getId() == -1));// 输出数据
                         }
                     } else {
-                        NettyUtils.error(401,
-                            MessageFormatter.format("destination or clientId is null", get.toString()).getMessage(),
-                            ctx.getChannel(),
-                            null);
+                        byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", get.toString()).getMessage());
+                        NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(get.getDestination(),
+                                get, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 401));
                     }
                     break;
                 case CLIENTACK:
@@ -209,10 +212,9 @@ public class SessionHandler extends SimpleChannelHandler {
                     MDC.put("destination", ack.getDestination());
                     if (StringUtils.isNotEmpty(ack.getDestination()) && StringUtils.isNotEmpty(ack.getClientId())) {
                         if (ack.getBatchId() == 0L) {
-                            NettyUtils.error(402,
-                                MessageFormatter.format("batchId should assign value", ack.toString()).getMessage(),
-                                ctx.getChannel(),
-                                null);
+                            byte[] errorBytes = NettyUtils.errorPacket(402, MessageFormatter.format("batchId should assign value", ack.toString()).getMessage());
+                            NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ack.getDestination(),
+                                    ack, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 402));
                         } else if (ack.getBatchId() == -1L) { // -1代表上一次get没有数据,直接忽略之
                             // donothing
                         } else {
@@ -220,10 +222,9 @@ public class SessionHandler extends SimpleChannelHandler {
                             embeddedServer.ack(clientIdentity, ack.getBatchId());
                         }
                     } else {
-                        NettyUtils.error(401,
-                            MessageFormatter.format("destination or clientId is null", ack.toString()).getMessage(),
-                            ctx.getChannel(),
-                            null);
+                        byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", ack.toString()).getMessage());
+                        NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ack.getDestination(),
+                                ack, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 401));
                     }
                     break;
                 case CLIENTROLLBACK:
@@ -239,25 +240,23 @@ public class SessionHandler extends SimpleChannelHandler {
                             embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
                         }
                     } else {
-                        NettyUtils.error(401,
-                            MessageFormatter.format("destination or clientId is null", rollback.toString())
-                                .getMessage(),
-                            ctx.getChannel(),
-                            null);
+                        byte[] errorBytes = NettyUtils.errorPacket(401, MessageFormatter.format("destination or clientId is null", rollback.toString()).getMessage());
+                        NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(rollback.getDestination(),
+                                rollback, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 401));
                     }
                     break;
                 default:
-                    NettyUtils.error(400, MessageFormatter.format("packet type={} is NOT supported!", packet.getType())
-                        .getMessage(), ctx.getChannel(), null);
+                    byte[] errorBytes = NettyUtils.errorPacket(400, MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage());
+                    NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ctx.getChannel().getRemoteAddress().toString(),
+                            null, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 400));
                     break;
             }
         } catch (Throwable exception) {
-            NettyUtils.error(400,
-                MessageFormatter.format("something goes wrong with channel:{}, exception={}",
+            byte[] errorBytes = NettyUtils.errorPacket(400, MessageFormatter.format("something goes wrong with channel:{}, exception={}",
                     ctx.getChannel(),
-                    ExceptionUtils.getStackTrace(exception)).getMessage(),
-                ctx.getChannel(),
-                null);
+                    ExceptionUtils.getStackTrace(exception)).getMessage());
+            NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ctx.getChannel().getRemoteAddress().toString(),
+                    null, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 400));
         } finally {
             MDC.remove("destination");
         }

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/listener/ChannelFutureAggregator.java

@@ -67,7 +67,7 @@ public class ChannelFutureAggregator implements ChannelFutureListener {
         private ClientRequestResult(Builder builder) {
             this.destination = Preconditions.checkNotNull(builder.destination);
             this.type = Preconditions.checkNotNull(builder.type);
-            this.request = Preconditions.checkNotNull(builder.request);
+            this.request = builder.request;
             this.amount = builder.amount;
             this.latency = builder.latency;
             this.errorCode = builder.errorCode;