Browse Source

fixed issue #139 , KILL CONNECTION binlogdump thread

agapple 9 years ago
parent
commit
c32a5d0882

+ 51 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -12,6 +12,7 @@ import org.slf4j.LoggerFactory;
 
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.ClientAuthenticationPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.ClientAuthenticationPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.QuitCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.HandshakeInitializationPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.HandshakeInitializationPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.Reply323Packet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.Reply323Packet;
@@ -38,13 +39,15 @@ public class MysqlConnector {
     private int                 sendBufferSize    = 16 * 1024;
     private int                 sendBufferSize    = 16 * 1024;
 
 
     private SocketChannel       channel;
     private SocketChannel       channel;
+    private volatile boolean    dumping           = false;
+    // mysql connectinnId
+    private long                connectionId      = -1;
     private AtomicBoolean       connected         = new AtomicBoolean(false);
     private AtomicBoolean       connected         = new AtomicBoolean(false);
 
 
     public MysqlConnector(){
     public MysqlConnector(){
     }
     }
 
 
     public MysqlConnector(InetSocketAddress address, String username, String password){
     public MysqlConnector(InetSocketAddress address, String username, String password){
-
         this.address = address;
         this.address = address;
         this.username = username;
         this.username = username;
         this.password = password;
         this.password = password;
@@ -86,11 +89,29 @@ public class MysqlConnector {
                 if (channel != null) {
                 if (channel != null) {
                     channel.close();
                     channel.close();
                 }
                 }
-
                 logger.info("disConnect MysqlConnection to {}...", address);
                 logger.info("disConnect MysqlConnection to {}...", address);
             } catch (Exception e) {
             } catch (Exception e) {
                 throw new IOException("disconnect " + this.address + " failure:" + ExceptionUtils.getStackTrace(e));
                 throw new IOException("disconnect " + this.address + " failure:" + ExceptionUtils.getStackTrace(e));
             }
             }
+
+            // 执行一次quit
+            if (dumping && connectionId >= 0) {
+                MysqlConnector connector = null;
+                try {
+                    connector = this.fork();
+                    connector.connect();
+                    MysqlUpdateExecutor executor = new MysqlUpdateExecutor(connector);
+                    executor.update("KILL CONNECTION " + connectionId);
+                } catch (Exception e) {
+                    throw new IOException("KILL DUMP " + connectionId + " failure:" + ExceptionUtils.getStackTrace(e));
+                } finally {
+                    if (connector != null) {
+                        connector.disconnect();
+                    }
+                }
+
+                dumping = false;
+            }
         } else {
         } else {
             logger.info("the channel {} is not connected", this.address);
             logger.info("the channel {} is not connected", this.address);
         }
         }
@@ -113,6 +134,17 @@ public class MysqlConnector {
         return connector;
         return connector;
     }
     }
 
 
+    public void quit() throws IOException {
+        QuitCommandPacket quit = new QuitCommandPacket();
+        byte[] cmdBody = quit.toBytes();
+
+        HeaderPacket quitHeader = new HeaderPacket();
+        quitHeader.setPacketBodyLength(cmdBody.length);
+        quitHeader.setPacketSequenceNumber((byte) 0x00);
+        PacketManager.write(channel,
+            new ByteBuffer[] { ByteBuffer.wrap(quitHeader.toBytes()), ByteBuffer.wrap(cmdBody) });
+    }
+
     // ====================== help method ====================
     // ====================== help method ====================
 
 
     private void configChannel(SocketChannel channel) throws IOException {
     private void configChannel(SocketChannel channel) throws IOException {
@@ -140,6 +172,7 @@ public class MysqlConnector {
         }
         }
         HandshakeInitializationPacket handshakePacket = new HandshakeInitializationPacket();
         HandshakeInitializationPacket handshakePacket = new HandshakeInitializationPacket();
         handshakePacket.fromBytes(body);
         handshakePacket.fromBytes(body);
+        connectionId = handshakePacket.threadId; // 记录一下connection
 
 
         logger.info("handshake initialization packet received, prepare the client authentication packet to send");
         logger.info("handshake initialization packet received, prepare the client authentication packet to send");
 
 
@@ -291,4 +324,20 @@ public class MysqlConnector {
         this.password = password;
         this.password = password;
     }
     }
 
 
+    public long getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(long connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    public boolean isDumping() {
+        return dumping;
+    }
+
+    public void setDumping(boolean dumping) {
+        this.dumping = dumping;
+    }
+
 }
 }

+ 35 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/QuitCommandPacket.java

@@ -0,0 +1,35 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets.client;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.CommandPacket;
+
+/**
+ * quit cmd
+ * 
+ * @author agapple 2016年3月1日 下午8:33:02
+ * @since 1.0.22
+ */
+public class QuitCommandPacket extends CommandPacket {
+
+    public static final byte[] QUIT = new byte[] { 1, 0, 0, 0, 1 };
+
+    public QuitCommandPacket(){
+        setCommand((byte) 0x01);
+    }
+
+    @Override
+    public void fromBytes(byte[] data) throws IOException {
+
+    }
+
+    @Override
+    public byte[] toBytes() throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        out.write(getCommand());
+        out.write(QUIT);
+        return out.toByteArray();
+    }
+
+}

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -231,6 +231,8 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                             sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
                             sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
                         }
                         }
                     } finally {
                     } finally {
+                        // 重新置为中断状态
+                        Thread.interrupted();
                         // 关闭一下链接
                         // 关闭一下链接
                         afterDump(erosaConnection);
                         afterDump(erosaConnection);
                         try {
                         try {

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -141,6 +141,8 @@ public class MysqlConnection implements ErosaConnection {
         binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
         binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
         PacketManager.write(connector.getChannel(), new ByteBuffer[] { ByteBuffer.wrap(binlogDumpHeader.toBytes()),
         PacketManager.write(connector.getChannel(), new ByteBuffer[] { ByteBuffer.wrap(binlogDumpHeader.toBytes()),
                 ByteBuffer.wrap(cmdBody) });
                 ByteBuffer.wrap(cmdBody) });
+
+        connector.setDumping(true);
     }
     }
 
 
     public MysqlConnection fork() {
     public MysqlConnection fork() {

+ 0 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -28,7 +28,6 @@ import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.LogEvent;