Prechádzať zdrojové kódy

Netty server/client profiler skeleton.

Chuanyi Li L 6 rokov pred
rodič
commit
c7fc149e7e

+ 1 - 1
pom.xml

@@ -247,7 +247,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_520</version>
+                <version>2.0.0_preview_186</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>

+ 92 - 0
server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNettyProfiler.java

@@ -0,0 +1,92 @@
+package com.alibaba.otter.canal.server.netty;
+
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
+import com.alibaba.otter.canal.server.netty.listener.ChannelFutureAggregator.ClientRequestResult;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * @author Chuanyi Li
+ */
+public class CanalServerWithNettyProfiler {
+
+    private static final ClientInstanceProfilerFactory          DISABLED     = new DefaultClientInstanceProfilerFactory();
+
+    private volatile ClientInstanceProfilerFactory              factory;
+
+    private final ConcurrentMap<String, ClientInstanceProfiler> cliPfs;
+
+    private final CanalServerWithEmbedded                       server;
+
+    private static class SingletonHolder {
+        private static CanalServerWithNettyProfiler SINGLETON = new CanalServerWithNettyProfiler();
+    }
+
+    private CanalServerWithNettyProfiler() {
+        this.factory = DISABLED;
+        this.cliPfs = new ConcurrentHashMap<String, ClientInstanceProfiler>();
+        this.server = CanalServerWithEmbedded.instance();
+    }
+
+    public static CanalServerWithNettyProfiler profiler() {
+        return SingletonHolder.SINGLETON;
+    }
+
+    public void profiling(String dest, ClientRequestResult result) {
+        if (isDisabled()) {
+            return;
+        }
+        ClientInstanceProfiler profiler = tryGet(dest);
+        if (profiler != null) {
+            profiler.profiling(result);
+        }
+    }
+
+    /**
+     * Remove instance profiler for specified instance.
+     * Only accepted while instance is not running.
+     * @param dest canal instance destination
+     */
+    public void remove(String dest) throws IllegalStateException {
+        if (isDisabled()) {
+            return;
+        }
+        synchronized (cliPfs) {
+            if (server.isStart(dest)) {
+                throw new IllegalStateException("Instance profiler should not be removed while running.");
+            }
+            cliPfs.remove(dest);
+        }
+    }
+
+    public void setFactory(ClientInstanceProfilerFactory factory) {
+        this.factory = factory;
+    }
+
+    private boolean isDisabled() {
+        return factory == DISABLED || factory == null;
+    }
+
+    private ClientInstanceProfiler tryGet(String dest) {
+        //try fast get
+        ClientInstanceProfiler profiler = cliPfs.get(dest);
+        if (profiler == null) {
+            synchronized (cliPfs) {
+                if (server.isStart(dest)) {
+                    // avoid overwriting
+                    cliPfs.putIfAbsent(dest, factory.create());
+                    profiler = cliPfs.get(dest);
+                }
+            }
+        }
+        return profiler;
+    }
+
+    private static class DefaultClientInstanceProfilerFactory implements ClientInstanceProfilerFactory {
+        @Override
+        public ClientInstanceProfiler create() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+}

+ 14 - 0
server/src/main/java/com/alibaba/otter/canal/server/netty/ClientInstanceProfiler.java

@@ -0,0 +1,14 @@
+package com.alibaba.otter.canal.server.netty;
+
+import com.alibaba.otter.canal.server.netty.listener.ChannelFutureAggregator;
+
+/**
+ * @author Chuanyi Li
+ */
+public interface ClientInstanceProfiler {
+
+    String getDestination();
+
+    void profiling(ChannelFutureAggregator.ClientRequestResult result);
+
+}

+ 10 - 0
server/src/main/java/com/alibaba/otter/canal/server/netty/ClientInstanceProfilerFactory.java

@@ -0,0 +1,10 @@
+package com.alibaba.otter.canal.server.netty;
+
+/**
+ * @author Chuanyi Li
+ */
+public interface ClientInstanceProfilerFactory {
+
+    ClientInstanceProfiler create();
+
+}

+ 1 - 0
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -51,6 +51,7 @@ public class SessionHandler extends SimpleChannelHandler {
 
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
         logger.info("message receives in session handler...");
+        long start = System.currentTimeMillis();
         ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
         Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
         ClientIdentity clientIdentity = null;

+ 89 - 35
server/src/main/java/com/alibaba/otter/canal/server/netty/listener/ChannelFutureAggregator.java

@@ -1,103 +1,157 @@
 package com.alibaba.otter.canal.server.netty.listener;
 
 import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.GeneratedMessage;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
 
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
+import static com.alibaba.otter.canal.server.netty.CanalServerWithNettyProfiler.profiler;
 
 /**
  * @author Chuanyi Li
  */
 public class ChannelFutureAggregator implements ChannelFutureListener {
 
-    private String                 dest;
+    private ClientRequestResult result;
 
-    private CanalPacket.PacketType type;
-
-    private int                    amount;
-
-    private int                    errorCode;
-
-    public ChannelFutureAggregator(String dest, CanalPacket.PacketType type, int amount) {
-        this(dest, type, amount, 0);
+    public ChannelFutureAggregator(String dest, GeneratedMessage request, CanalPacket.PacketType type, int amount, long latency) {
+        this(dest, request, type, amount, latency, 0);
     }
 
-    public ChannelFutureAggregator(String dest, CanalPacket.PacketType type, int amount, int errorCode) {
-        this.dest = dest;
-        this.type = type;
-        this.amount = amount;
-        this.errorCode = errorCode;
+    private ChannelFutureAggregator(String dest, GeneratedMessage request, CanalPacket.PacketType type, int amount, long latency, int errorCode) {
+        this.result = new ClientRequestResult.Builder()
+                .dest(dest)
+                .type(type)
+                .request(request)
+                .amount(amount)
+                .latency(latency)
+                .errorCode(errorCode)
+                .build();
     }
 
     @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-        if (future.isSuccess()) {
-            ConcurrentHashMap map = new ConcurrentHashMap();
-            map.putIfAbsent("", "");
-        } else {
-            Throwable t = future.getCause();
+    public void operationComplete(ChannelFuture future) {
+        // profiling after I/O operation
+        if (future.getCause() != null) {
+            result.channelError = future.getCause();
         }
+        profiler().profiling(result.dest, result);
     }
 
+    /**
+     * Client request result pojo
+     */
     public static class ClientRequestResult {
 
         private String                 dest;
-
         private CanalPacket.PacketType type;
-
-        private int                    amount;
-
-        private int                    errorCode;
+        private GeneratedMessage       request;
+        private int       amount;
+        private long      latency;
+        private int       errorCode;
+        private Throwable channelError;
 
         private ClientRequestResult() {}
 
         private ClientRequestResult(Builder builder) {
-            this.dest = builder.dest;
-            this.type = builder.type;
+            this.dest = Preconditions.checkNotNull(builder.dest);
+            this.type = Preconditions.checkNotNull(builder.type);
+            this.request = Preconditions.checkNotNull(builder.request);
             this.amount = builder.amount;
+            this.latency = builder.latency;
             this.errorCode = builder.errorCode;
+            this.channelError = builder.channelError;
         }
 
-        private static class Builder {
+        // auto-generated
+        public static class Builder {
 
             private String                 dest;
             private CanalPacket.PacketType type;
+            private GeneratedMessage       request;
             private int                    amount;
+            private long                   latency;
             private int                    errorCode;
+            private Throwable              channelError;
 
-            public Builder dest(String dest) {
+            Builder dest(String dest) {
                 this.dest = dest;
                 return this;
             }
 
-            public Builder type(CanalPacket.PacketType type) {
+            Builder type(CanalPacket.PacketType type) {
                 this.type = type;
                 return this;
             }
 
-            public Builder amount(int amount) {
+            Builder request(GeneratedMessage request) {
+                this.request = request;
+                return this;
+            }
+
+            Builder amount(int amount) {
                 this.amount = amount;
                 return this;
             }
 
-            public Builder errorCode(int errorCode) {
+            Builder latency(long latency) {
+                this.latency = latency;
+                return this;
+            }
+
+            Builder errorCode(int errorCode) {
                 this.errorCode = errorCode;
                 return this;
             }
 
+            public Builder channelError(Throwable channelError) {
+                this.channelError = channelError;
+                return this;
+            }
+
             public Builder fromPrototype(ClientRequestResult prototype) {
                 dest = prototype.dest;
                 type = prototype.type;
+                request = prototype.request;
                 amount = prototype.amount;
+                latency = prototype.latency;
                 errorCode = prototype.errorCode;
+                channelError = prototype.channelError;
                 return this;
             }
 
-            public ClientRequestResult build() {
+            ClientRequestResult build() {
                 return new ClientRequestResult(this);
             }
         }
+        // getters
+        public String getDest() {
+            return dest;
+        }
+
+        public CanalPacket.PacketType getType() {
+            return type;
+        }
+
+        public GeneratedMessage getRequest() {
+            return request;
+        }
+
+        public int getAmount() {
+            return amount;
+        }
+
+        public long getLatency() {
+            return latency;
+        }
+
+        public int getErrorCode() {
+            return errorCode;
+        }
+
+        public Throwable getChannelError() {
+            return channelError;
+        }
     }
 }