|
@@ -6,7 +6,6 @@ import io.netty.buffer.PooledByteBufAllocator;
|
|
import io.netty.channel.AdaptiveRecvByteBufAllocator;
|
|
import io.netty.channel.AdaptiveRecvByteBufAllocator;
|
|
import io.netty.channel.Channel;
|
|
import io.netty.channel.Channel;
|
|
import io.netty.channel.ChannelFuture;
|
|
import io.netty.channel.ChannelFuture;
|
|
-import io.netty.channel.ChannelFutureListener;
|
|
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
import io.netty.channel.ChannelInitializer;
|
|
import io.netty.channel.ChannelInitializer;
|
|
@@ -20,8 +19,10 @@ import java.io.IOException;
|
|
import java.net.SocketAddress;
|
|
import java.net.SocketAddress;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
-import com.alibaba.otter.canal.common.utils.BooleanMutex;
|
|
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
/**
|
|
/**
|
|
* @author luoyaogui 实现channel的管理(监听连接、读数据、回收) 2016-12-28
|
|
* @author luoyaogui 实现channel的管理(监听连接、读数据、回收) 2016-12-28
|
|
@@ -29,9 +30,10 @@ import com.alibaba.otter.canal.common.utils.BooleanMutex;
|
|
@SuppressWarnings({ "rawtypes", "deprecation" })
|
|
@SuppressWarnings({ "rawtypes", "deprecation" })
|
|
public abstract class SocketChannelPool {
|
|
public abstract class SocketChannelPool {
|
|
|
|
|
|
- private static EventLoopGroup group = new NioEventLoopGroup(); // 非阻塞IO线程组
|
|
|
|
- private static Bootstrap boot = new Bootstrap(); // 主
|
|
|
|
|
|
+ private static EventLoopGroup group = new NioEventLoopGroup(); // 非阻塞IO线程组
|
|
|
|
+ private static Bootstrap boot = new Bootstrap(); // 主
|
|
private static Map<Channel, SocketChannel> chManager = new ConcurrentHashMap<Channel, SocketChannel>();
|
|
private static Map<Channel, SocketChannel> chManager = new ConcurrentHashMap<Channel, SocketChannel>();
|
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(SocketChannelPool.class);
|
|
|
|
|
|
static {
|
|
static {
|
|
boot.group(group)
|
|
boot.group(group)
|
|
@@ -71,8 +73,8 @@ public abstract class SocketChannelPool {
|
|
|
|
|
|
public static class BusinessHandler extends ChannelInboundHandlerAdapter {
|
|
public static class BusinessHandler extends ChannelInboundHandlerAdapter {
|
|
|
|
|
|
- private SocketChannel socket = null;
|
|
|
|
- private final CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
|
|
+ private SocketChannel socket = null;
|
|
|
|
+ private final CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
@@ -81,20 +83,20 @@ public abstract class SocketChannelPool {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
|
|
- socket = new SocketChannel();
|
|
|
|
- socket.setChannel(ctx.channel());
|
|
|
|
- chManager.put(ctx.channel(), socket);
|
|
|
|
- latch.countDown();
|
|
|
|
- super.channelActive(ctx);
|
|
|
|
- }
|
|
|
|
|
|
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
|
|
+ socket = new SocketChannel();
|
|
|
|
+ socket.setChannel(ctx.channel());
|
|
|
|
+ chManager.put(ctx.channel(), socket);
|
|
|
|
+ latch.countDown();
|
|
|
|
+ super.channelActive(ctx);
|
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
if (socket != null) {
|
|
if (socket != null) {
|
|
socket.writeCache((ByteBuf) msg);
|
|
socket.writeCache((ByteBuf) msg);
|
|
} else {
|
|
} else {
|
|
- //TODO: need graceful error handler.
|
|
|
|
|
|
+ // TODO: need graceful error handler.
|
|
logger.error("no socket available.");
|
|
logger.error("no socket available.");
|
|
}
|
|
}
|
|
ReferenceCountUtil.release(msg);// 添加防止内存泄漏的
|
|
ReferenceCountUtil.release(msg);// 添加防止内存泄漏的
|