Procházet zdrojové kódy

Update SocketChannel.java

luoyaogui před 8 roky
rodič
revize
48c108806a

+ 17 - 23
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java

@@ -7,7 +7,6 @@ import io.netty.channel.Channel;
 
 import java.io.IOException;
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 
 /**
  * @author luoyaogui
@@ -17,18 +16,13 @@ import java.nio.ByteBuffer;
 public class SocketChannel {
 	private Channel channel = null;
 	private Object lock = new Object();
-	private ByteBuf cache = PooledByteBufAllocator.DEFAULT.directBuffer(1024*1024*5);//缓存大小
+	private ByteBuf cache = PooledByteBufAllocator.DEFAULT.directBuffer(1024*1024);//缓存大小
 
 	public Channel getChannel() {
 		return channel;
 	}
-	public void setChannel(Channel channel,boolean notify) {
+	public void setChannel(Channel channel) {
 		this.channel = channel;
-		if(notify){//是否需要通知,主要时channel不可用时
-			synchronized(this){
-				notifyAll();
-			}
-		}
 	}
 
 	public void writeCache(ByteBuf buf){
@@ -36,9 +30,6 @@ public class SocketChannel {
 			cache.discardReadBytes();//回收内存
 			cache.writeBytes(buf);
 		}
-		synchronized(this){
-			notifyAll();
-		}
 	}
 	public void writeChannel(byte[]... buf) throws IOException {
 		if(channel != null && channel.isWritable())
@@ -46,19 +37,22 @@ public class SocketChannel {
 		else
 			throw new IOException("write  failed  !  please checking !");
 	}
-	public int read(ByteBuffer buffer) throws IOException {
-		if(null == channel)
-			throw new IOException("socket has Interrupted !");
-		if(cache.readableBytes() <  buffer.remaining()){
-			synchronized(this){
-				try {wait();} catch (InterruptedException e) {}
+	public byte[] read(int readSize) throws IOException {
+		do{
+			if(readSize > cache.readableBytes()){
+				if(null == channel)
+					throw new IOException("socket has Interrupted !");
+				synchronized (this) {
+					try { wait(100); } catch (InterruptedException e) {}
+				}
+			} else {
+				byte[] back = new byte[readSize];
+				synchronized (lock) {
+					cache.readBytes(back);
+				}
+				return back;
 			}
-		}else{
-			synchronized (lock){
-				cache.readBytes(buffer);
-			}
-		}
-		return 0;
+		}while(true);
 	}
 	public boolean isConnected() {
 		return channel!=null?true:false;