Sfoglia il codice sorgente

Merge pull request #528 from lcybo/timeout

Use mysql master heartbeat to detect phycial tcp connection failure.
agapple 7 anni fa
parent
commit
826a45c105

+ 10 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -4,6 +4,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -28,6 +29,8 @@ import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 
+import static com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.MASTER_HEARTBEAT_PERIOD_SECONDS;
+
 public class MysqlConnection implements ErosaConnection {
 
     private static final Logger logger      = LoggerFactory.getLogger(MysqlConnection.class);
@@ -293,6 +296,13 @@ public class MysqlConnection implements ErosaConnection {
         } catch (Exception e) {
             logger.warn("update mariadb_slave_capability failed", e);
         }
+
+        long periodNano = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS);
+        try {
+            update("SET @master_heartbeat_period=" + periodNano);
+        } catch (Exception e) {
+            logger.warn("update master_heartbeat_period failed", e);
+        }
     }
 
     /**

+ 6 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -21,6 +21,11 @@ public class DirectLogFetcher extends LogFetcher {
 
     protected static final Logger logger            = LoggerFactory.getLogger(DirectLogFetcher.class);
 
+    // Master heartbeat interval
+    public static final int MASTER_HEARTBEAT_PERIOD_SECONDS = 15;
+    // +1s 确保 timeout > heartbeat interval
+    private static final int READ_TIMEOUT_MILLISECONDS = (MASTER_HEARTBEAT_PERIOD_SECONDS + 1) * 1000;
+
     /** Command to dump binlog */
     public static final byte      COM_BINLOG_DUMP   = 18;
 
@@ -166,7 +171,7 @@ public class DirectLogFetcher extends LogFetcher {
     private final boolean fetch0(final int off, final int len) throws IOException {
         ensureCapacity(off + len);
 
-        byte[] read = channel.read(len);
+        byte[] read = channel.read(len, READ_TIMEOUT_MILLISECONDS);
         System.arraycopy(read, 0, this.buffer, off, len);
 
         if (limit < off + len) limit = off + len;