Browse Source

revert pull request https://github.com/alibaba/canal/pull/256

agapple 8 years ago
parent
commit
d78b389737

+ 21 - 6
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -2,6 +2,8 @@ package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
@@ -14,8 +16,6 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.client.QuitCommandPack
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.HandshakeInitializationPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.Reply323Packet;
-import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
-import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannelPool;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.MySQLPasswordEncrypter;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 
@@ -64,8 +64,10 @@ public class MysqlConnector {
     public void connect() throws IOException {
         if (connected.compareAndSet(false, true)) {
             try {
-                channel = SocketChannelPool.open(address);
+                channel = SocketChannel.open();
+                configChannel(channel);
                 logger.info("connect MysqlConnection to {}...", address);
+                channel.connect(address);
                 negotiate(channel);
             } catch (Exception e) {
                 disconnect();
@@ -140,7 +142,19 @@ public class MysqlConnector {
         HeaderPacket quitHeader = new HeaderPacket();
         quitHeader.setPacketBodyLength(cmdBody.length);
         quitHeader.setPacketSequenceNumber((byte) 0x00);
-        PacketManager.write(channel, quitHeader.toBytes(), cmdBody);
+        PacketManager.write(channel,
+            new ByteBuffer[] { ByteBuffer.wrap(quitHeader.toBytes()), ByteBuffer.wrap(cmdBody) });
+    }
+
+    // ====================== help method ====================
+
+    private void configChannel(SocketChannel channel) throws IOException {
+        channel.socket().setKeepAlive(true);
+        channel.socket().setReuseAddress(true);
+        channel.socket().setSoTimeout(soTimeout);
+        channel.socket().setTcpNoDelay(true);
+        channel.socket().setReceiveBufferSize(receiveBufferSize);
+        channel.socket().setSendBufferSize(sendBufferSize);
     }
 
     private void negotiate(SocketChannel channel) throws IOException {
@@ -177,7 +191,8 @@ public class MysqlConnector {
         h.setPacketBodyLength(clientAuthPkgBody.length);
         h.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
 
-        PacketManager.write(channel, h.toBytes(), clientAuthPkgBody);
+        PacketManager.write(channel,
+            new ByteBuffer[] { ByteBuffer.wrap(h.toBytes()), ByteBuffer.wrap(clientAuthPkgBody) });
         logger.info("client authentication packet is sent out.");
 
         // check auth result
@@ -213,7 +228,7 @@ public class MysqlConnector {
         h323.setPacketBodyLength(b323Body.length);
         h323.setPacketSequenceNumber((byte) (packetSequenceNumber + 1));
 
-        PacketManager.write(channel, h323.toBytes(), b323Body);
+        PacketManager.write(channel, new ByteBuffer[] { ByteBuffer.wrap(h323.toBytes()), ByteBuffer.wrap(b323Body) });
         logger.info("client 323 authentication packet is sent out.");
         // check auth result
         HeaderPacket header = PacketManager.readHeader(channel, 4);

+ 2 - 4
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlQueryExecutor.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
+import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -11,7 +12,6 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetHeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.RowDataPacket;
-import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 
 /**
@@ -82,15 +82,13 @@ public class MysqlQueryExecutor {
             rowDataPacket.fromBytes(body);
             rowData.add(rowDataPacket);
         }
-        // 未知,不知道是否需要锁定
-        // channel.lock();//锁定读
 
         ResultSetPacket resultSet = new ResultSetPacket();
         resultSet.getFieldDescriptors().addAll(fields);
         for (RowDataPacket r : rowData) {
             resultSet.getFieldValues().addAll(r.getColumns());
         }
-        resultSet.setSourceAddress(channel.getRemoteSocketAddress());
+        resultSet.setSourceAddress(channel.socket().getRemoteSocketAddress());
 
         return resultSet;
     }

+ 1 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlUpdateExecutor.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
+import java.nio.channels.SocketChannel;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -8,7 +9,6 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.QueryCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.OKPacket;
-import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 
 /**
@@ -48,7 +48,6 @@ public class MysqlUpdateExecutor {
             packet.fromBytes(body);
             throw new IOException(packet + "\n with command: " + updateString);
         }
-        // channel.lock();//锁定读
 
         OKPacket packet = new OKPacket();
         packet.fromBytes(body);

+ 16 - 4
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java

@@ -2,9 +2,9 @@ package com.alibaba.otter.canal.parse.driver.mysql.utils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
-import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 
 public abstract class PacketManager {
 
@@ -40,8 +40,20 @@ public abstract class PacketManager {
      * @return
      * @throws IOException
      */
-    public static void write(SocketChannel ch, byte[]... srcs) throws IOException {
-        ch.writeChannel(srcs);
+    public static void write(SocketChannel ch, ByteBuffer[] srcs) throws IOException {
+        @SuppressWarnings("unused")
+        long total = 0;
+        for (ByteBuffer buffer : srcs) {
+            total += buffer.remaining();
+        }
+
+        ch.write(srcs);
+        // https://github.com/alibaba/canal/issues/24
+        // 部分windows用户会出现size != total的情况,jdk为java7/openjdk,估计和java版本有关,暂时不做检查
+        // long size = ch.write(srcs);
+        // if (size != total) {
+        // throw new IOException("unexpected blocking io behavior");
+        // }
     }
 
     public static void write(SocketChannel ch, byte[] body) throws IOException {
@@ -52,6 +64,6 @@ public abstract class PacketManager {
         HeaderPacket header = new HeaderPacket();
         header.setPacketBodyLength(body.length);
         header.setPacketSequenceNumber(packetSeqNumber);
-        write(ch, header.toBytes(), body);
+        write(ch, new ByteBuffer[] { ByteBuffer.wrap(header.toBytes()), ByteBuffer.wrap(body) });
     }
 }

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -5,11 +5,11 @@ import java.io.InterruptedIOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import com.taobao.tddl.dbsync.binlog.LogFetcher;
 
 /**

+ 3 - 1
parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.parse;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -93,6 +94,7 @@ public class DirectLogFetcherTest {
         HeaderPacket binlogDumpHeader = new HeaderPacket();
         binlogDumpHeader.setPacketBodyLength(cmdBody.length);
         binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
-        PacketManager.write(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
+        PacketManager.write(connector.getChannel(), new ByteBuffer[] { ByteBuffer.wrap(binlogDumpHeader.toBytes()),
+                ByteBuffer.wrap(cmdBody) });
     }
 }