Jelajahi Sumber

Update PacketManager.java

luoyaogui 8 tahun lalu
induk
melakukan
b6090da0e5

+ 8 - 31
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java

@@ -1,7 +1,6 @@
 package com.alibaba.otter.canal.parse.driver.mysql.utils;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
@@ -10,48 +9,26 @@ public abstract class PacketManager {
 
     public static HeaderPacket readHeader(SocketChannel ch, int len) throws IOException {
         HeaderPacket header = new HeaderPacket();
-        header.fromBytes(readBytesAsBuffer(ch, len).array());
+        header.fromBytes(ch.read(len));
         return header;
     }
 
-    public static ByteBuffer readBytesAsBuffer(SocketChannel ch, int len) throws IOException {
-        ByteBuffer buffer = ByteBuffer.allocate(len);
-        while (buffer.hasRemaining()) {
-            int readNum = ch.read(buffer);
-            if (readNum == -1) {
-                throw new IOException("Unexpected End Stream");
-            }
-        }
-        return buffer;
-    }
-
     public static byte[] readBytes(SocketChannel ch, int len) throws IOException {
-        return readBytesAsBuffer(ch, len).array();
+        return ch.read(len);
     }
 
-    /**
-     * Since We r using blocking IO, so we will just write once and assert the
-     * length to simplify the read operation.<br>
-     * If the block write doesn't work as we expected, we will change this
-     * implementation as per the result.
-     * 
-     * @param ch
-     * @param len
-     * @return
-     * @throws IOException
-     */
-    public static void write(SocketChannel ch, byte[]... srcs) throws IOException {
+    public static void writePkg(SocketChannel ch, byte[]... srcs) throws IOException {
         ch.writeChannel(srcs);
     }
-
-    public static void write(SocketChannel ch, byte[] body) throws IOException {
-        write(ch, body, (byte) 0);
+    
+    public static void writeBody(SocketChannel ch, byte[] body) throws IOException {
+    	writeBody0(ch, body, (byte) 0);
     }
 
-    public static void write(SocketChannel ch, byte[] body, byte packetSeqNumber) throws IOException {
+    public static void writeBody0(SocketChannel ch, byte[] body, byte packetSeqNumber) throws IOException {
         HeaderPacket header = new HeaderPacket();
         header.setPacketBodyLength(body.length);
         header.setPacketSequenceNumber(packetSeqNumber);
-        write(ch, header.toBytes(),body);
+        ch.writeChannel(header.toBytes(),body);
     }
 }