Browse Source

fixed issue #483 , support show slave hosts

agapple 7 years ago
parent
commit
91ac0e30aa

+ 5 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -336,4 +336,9 @@ public class MysqlConnector {
     public void setConnTimeout(int connTimeout) {
         this.connTimeout = connTimeout;
     }
+
+    public String getPassword() {
+        return password;
+    }
+
 }

+ 57 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/RegisterSlaveCommandPacket.java

@@ -0,0 +1,57 @@
+package com.alibaba.otter.canal.parse.driver.mysql.packets.client;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.CommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
+
+/**
+ * COM_REGISTER_SLAVE
+ * 
+ * @author zhibinliu
+ * @since 1.0.24
+ */
+public class RegisterSlaveCommandPacket extends CommandPacket {
+
+    public String reportHost;
+    public int    reportPort;
+    public String reportUser;
+    public String reportPasswd;
+    public long   serverId;
+
+    public RegisterSlaveCommandPacket(){
+        setCommand((byte) 0x15);
+    }
+
+    public void fromBytes(byte[] data) {
+        // bypass
+    }
+
+    public static byte[] toLH(int n) {
+        byte[] b = new byte[4];
+        b[0] = (byte) (n & 0xff);
+        b[1] = (byte) (n >> 8 & 0xff);
+        b[2] = (byte) (n >> 16 & 0xff);
+        b[3] = (byte) (n >> 24 & 0xff);
+        return b;
+    }
+
+    public byte[] toBytes() throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        out.write(getCommand());
+        ByteHelper.writeUnsignedIntLittleEndian(serverId, out);
+        out.write((byte) reportHost.getBytes().length);
+        ByteHelper.writeFixedLengthBytesFromStart(reportHost.getBytes(), reportHost.getBytes().length, out);
+        out.write((byte) reportUser.getBytes().length);
+        ByteHelper.writeFixedLengthBytesFromStart(reportUser.getBytes(), reportUser.getBytes().length, out);
+        out.write((byte) reportPasswd.getBytes().length);
+        ByteHelper.writeFixedLengthBytesFromStart(reportPasswd.getBytes(), reportPasswd.getBytes().length, out);
+        ByteHelper.writeUnsignedShortLittleEndian(reportPort, out);
+        ByteHelper.writeUnsignedIntLittleEndian(0, out);// Fake
+                                                        // rpl_recovery_rank
+        ByteHelper.writeUnsignedIntLittleEndian(0, out);// master id
+        return out.toByteArray();
+    }
+
+}

+ 33 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -14,7 +14,9 @@ import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.RegisterSlaveCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.SemiAckCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
@@ -130,6 +132,7 @@ public class MysqlConnection implements ErosaConnection {
 
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         updateSettings();
+        sendRegisterSlave();
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
@@ -157,11 +160,40 @@ public class MysqlConnection implements ErosaConnection {
         throw new NullPointerException("Not implement yet");
     }
 
+    private void sendRegisterSlave() throws IOException {
+        RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket();
+        cmd.reportHost = authInfo.getAddress().getAddress().getHostAddress();
+        cmd.reportPasswd = authInfo.getPassword();
+        cmd.reportUser = authInfo.getUsername();
+        cmd.reportPort = authInfo.getAddress().getPort(); // 暂时先用master节点的port
+        cmd.serverId = this.slaveId;
+        byte[] cmdBody = cmd.toBytes();
+
+        logger.info("Register slave {}", cmd);
+
+        HeaderPacket header = new HeaderPacket();
+        header.setPacketBodyLength(cmdBody.length);
+        header.setPacketSequenceNumber((byte) 0x00);
+        PacketManager.writePkg(connector.getChannel(), header.toBytes(), cmdBody);
+
+        header = PacketManager.readHeader(connector.getChannel(), 4);
+        byte[] body = PacketManager.readBytes(connector.getChannel(), header.getPacketBodyLength());
+        assert body != null;
+        if (body[0] < 0) {
+            if (body[0] == -1) {
+                ErrorPacket err = new ErrorPacket();
+                err.fromBytes(body);
+                throw new IOException("Error When doing Register slave:" + err.toString());
+            } else {
+                throw new IOException("unpexpected packet with field_count=" + body[0]);
+            }
+        }
+    }
+
     private void sendBinlogDump(String binlogfilename, Long binlogPosition) throws IOException {
         BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();
         binlogDumpCmd.binlogFileName = binlogfilename;
         binlogDumpCmd.binlogPosition = binlogPosition;
-        // binlogDumpCmd.slaveServerId = this.slaveId;
         binlogDumpCmd.slaveServerId = this.slaveId;
         byte[] cmdBody = binlogDumpCmd.toBytes();
 
@@ -185,7 +217,6 @@ public class MysqlConnection implements ErosaConnection {
         semiAckHeader.setPacketBodyLength(cmdBody.length);
         semiAckHeader.setPacketSequenceNumber((byte) 0x00);
         PacketManager.writePkg(connector.getChannel(), semiAckHeader.toBytes(), cmdBody);
-
     }
 
     public MysqlConnection fork() {

+ 100 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

@@ -3,27 +3,38 @@ package com.alibaba.otter.canal.parse;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.commons.lang.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
+import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.RegisterSlaveCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
 
 public class DirectLogFetcherTest {
 
+    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
     @Test
     public void testSimple() {
         DirectLogFetcher fetcher = new DirectLogFetcher();
         try {
-            MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "xxxxx", "xxxxx");
+            MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "xxxx", "xxxx");
             connector.connect();
-            sendBinlogDump(connector, "mysql-bin.001016", 4L, 3);
+            updateSettings(connector);
+            sendRegisterSlave(connector, 3);
+            sendBinlogDump(connector, "mysql-bin.000001", 4L, 3);
 
             fetcher.start(connector.getChannel());
 
@@ -42,6 +53,7 @@ public class DirectLogFetcherTest {
                     case LogEvent.ROTATE_EVENT:
                         // binlogFileName = ((RotateLogEvent)
                         // event).getFilename();
+                        System.out.println(((RotateLogEvent) event).getFilename());
                         break;
                     case LogEvent.WRITE_ROWS_EVENT_V1:
                     case LogEvent.WRITE_ROWS_EVENT:
@@ -82,6 +94,33 @@ public class DirectLogFetcherTest {
 
     }
 
+    private void sendRegisterSlave(MysqlConnector connector, int slaveId) throws IOException {
+        RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket();
+        cmd.reportHost = connector.getAddress().getAddress().getHostAddress();
+        cmd.reportPasswd = connector.getPassword();
+        cmd.reportUser = connector.getUsername();
+        cmd.serverId = slaveId;
+        byte[] cmdBody = cmd.toBytes();
+
+        HeaderPacket header = new HeaderPacket();
+        header.setPacketBodyLength(cmdBody.length);
+        header.setPacketSequenceNumber((byte) 0x00);
+        PacketManager.writePkg(connector.getChannel(), header.toBytes(), cmdBody);
+
+        header = PacketManager.readHeader(connector.getChannel(), 4);
+        byte[] body = PacketManager.readBytes(connector.getChannel(), header.getPacketBodyLength());
+        assert body != null;
+        if (body[0] < 0) {
+            if (body[0] == -1) {
+                ErrorPacket err = new ErrorPacket();
+                err.fromBytes(body);
+                throw new IOException("Error When doing Register slave:" + err.toString());
+            } else {
+                throw new IOException("unpexpected packet with field_count=" + body[0]);
+            }
+        }
+    }
+
     private void sendBinlogDump(MysqlConnector connector, String binlogfilename, Long binlogPosition, int slaveId)
                                                                                                                   throws IOException {
         BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();
@@ -95,4 +134,63 @@ public class DirectLogFetcherTest {
         binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
         PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
     }
+
+    private void updateSettings(MysqlConnector connector) throws IOException {
+        try {
+            update("set wait_timeout=9999999", connector);
+        } catch (Exception e) {
+            logger.warn("update wait_timeout failed", e);
+        }
+        try {
+            update("set net_write_timeout=1800", connector);
+        } catch (Exception e) {
+            logger.warn("update net_write_timeout failed", e);
+        }
+
+        try {
+            update("set net_read_timeout=1800", connector);
+        } catch (Exception e) {
+            logger.warn("update net_read_timeout failed", e);
+        }
+
+        try {
+            // 设置服务端返回结果时不做编码转化,直接按照数据库的二进制编码进行发送,由客户端自己根据需求进行编码转化
+            update("set names 'binary'", connector);
+        } catch (Exception e) {
+            logger.warn("update names failed", e);
+        }
+
+        try {
+            // mysql5.6针对checksum支持需要设置session变量
+            // 如果不设置会出现错误: Slave can not handle replication events with the
+            // checksum that master is configured to log
+            // 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码
+            update("set @master_binlog_checksum= '@@global.binlog_checksum'", connector);
+        } catch (Exception e) {
+            logger.warn("update master_binlog_checksum failed", e);
+        }
+
+        try {
+            // 参考:https://github.com/alibaba/canal/issues/284
+            // mysql5.6需要设置slave_uuid避免被server kill链接
+            update("set @slave_uuid=uuid()", connector);
+        } catch (Exception e) {
+            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
+                logger.warn("update slave_uuid failed", e);
+            }
+        }
+
+        try {
+            // mariadb针对特殊的类型,需要设置session变量
+            update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'", connector);
+        } catch (Exception e) {
+            logger.warn("update mariadb_slave_capability failed", e);
+        }
+    }
+
+    public void update(String cmd, MysqlConnector connector) throws IOException {
+        MysqlUpdateExecutor exector = new MysqlUpdateExecutor(connector);
+        exector.update(cmd);
+    }
+
 }