Browse Source

fixed issue #172, 精确识别主备切换,尝试基于时间戳重新定位位点,确保不丢数据

agapple 9 years ago
parent
commit
ae05a3f39f

+ 10 - 33
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -87,22 +87,12 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     protected volatile Timer                         timer;
     protected TimerTask                              heartBeatTimerTask;
     protected Throwable                              exception                  = null;
-    
-    //update by yishun.chen
-    //特殊异常处理参数
-    protected int                                    specialExceptionCount = 0;//特殊异常计数
-    protected int                                    specialExceptionCountThreshold = 3;//特殊异常计数阀值
-    protected boolean								 isFindEndPosition = false;//重连时查找数据库最新的位点
 
     protected abstract BinlogParser buildParser();
 
     protected abstract ErosaConnection buildErosaConnection();
 
     protected abstract EntryPosition findStartPosition(ErosaConnection connection) throws IOException;
-    
-    //update by yishun.chen
-    //查找数据库最新的位点
-    protected abstract EntryPosition findEndPosition(ErosaConnection connection) throws IOException;
 
     protected void preDump(ErosaConnection connection) {
     }
@@ -170,17 +160,6 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         erosaConnection.connect();// 链接
                         // 4. 获取最后的位置信息
                         EntryPosition position = findStartPosition(erosaConnection);
-                        //update by yishun.chen
-                        //获取当前数据库最后一个位置进行消费
-                        //当解析binlog多次失败后跳过当前失败的位点重新到数据库获取新的位点 
-                        if(isFindEndPosition){
-                        	position = findEndPosition(erosaConnection);
-                        	specialExceptionCount = 0;
-                        	isFindEndPosition = false;
-                        	logger.error("special exception count>" + specialExceptionCountThreshold + ", find end position, maybe cause data loss!");
-                        	sendAlarm(destination, "special exception count>" + specialExceptionCountThreshold + ", find end position, maybe cause data loss!");
-                        }
-                        
                         final EntryPosition startPosition = position;
                         if (startPosition == null) {
                             throw new CanalParseException("can't find start position for " + destination);
@@ -212,9 +191,9 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                                     return running;
                                 } catch (TableIdNotFoundException e) {
                                     throw e;
-                                } catch (Exception e) {
+                                } catch (Throwable e) {
                                     // 记录一下,出错的位点信息
-                                    processError(e,
+                                    processSinkError(e,
                                         this.lastPosition,
                                         startPosition.getJournalName(),
                                         startPosition.getPosition());
@@ -241,15 +220,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         logger.error(String.format("dump address %s has an error, retrying. caused by ",
                             runningInfo.getAddress().toString()), e);
                     } catch (Throwable e) {
-                    	//update by yishun.chen
-                    	//dump数据出现IOException累计错误出现3次以上跳过当前位点
-                    	if(e instanceof IOException){
-                    		specialExceptionCount ++;
-                    		if(specialExceptionCount >= specialExceptionCountThreshold){
-                    			isFindEndPosition = true;
-                    		}
-                    	}
-                    	
+                        processDumpError(e);
                         exception = e;
                         if (!running) {
                             if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) {
@@ -386,6 +357,8 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         position.setJournalName(entry.getHeader().getLogfileName());
         position.setPosition(entry.getHeader().getLogfileOffset());
         position.setTimestamp(entry.getHeader().getExecuteTime());
+        // add serverId at 2016-06-28
+        position.setServerId(entry.getHeader().getServerId());
         logPosition.setPostion(position);
 
         LogIdentity identity = new LogIdentity(runningInfo.getAddress(), -1L);
@@ -393,7 +366,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         return logPosition;
     }
 
-    protected void processError(Exception e, LogPosition lastPosition, String startBinlogFile, long startPosition) {
+    protected void processSinkError(Throwable e, LogPosition lastPosition, String startBinlogFile, long startPosition) {
         if (lastPosition != null) {
             logger.warn(String.format("ERROR ## parse this event has an error , last position : [%s]",
                 lastPosition.getPostion()),
@@ -405,6 +378,10 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         }
     }
 
+    protected void processDumpError(Throwable e) {
+        // do nothing
+    }
+
     protected void startHeartBeat(ErosaConnection connection) {
         lastEntryTime = 0L; // 初始化
         if (timer == null) {// lazy初始化一下

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -88,7 +88,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     public void setFilterQueryDdl(boolean filterQueryDdl) {
         this.filterQueryDdl = filterQueryDdl;
     }
-    
+
     public void setFilterRows(boolean filterRows) {
         this.filterRows = filterRows;
     }

+ 0 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -1,7 +1,5 @@
 package com.alibaba.otter.canal.parse.inbound.mysql;
 
-import java.io.IOException;
-
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.parse.CanalEventParser;
@@ -55,12 +53,6 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
         return connection;
     }
     
-    @Override
-	protected EntryPosition findEndPosition(ErosaConnection connection) throws IOException {
-		// TODO 没有实现
-		return findStartPosition(connection);
-	}
-
     @Override
     protected EntryPosition findStartPosition(ErosaConnection connection) {
         // 处理逻辑

+ 65 - 11
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -66,7 +66,9 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private BinlogFormat[]     supportBinlogFormats;                         // 支持的binlogFormat,如果设置会执行强校验
     private BinlogImage[]      supportBinlogImages;                          // 支持的binlogImage,如果设置会执行强校验
 
-    // 心跳检查
+    // update by yishun.chen,特殊异常处理参数
+    private int                dumpErrorCount                    = 0;        // binlogDump失败异常计数
+    private int                dumpErrorCountThreshold           = 3;        // binlogDump失败异常计数阀值
 
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
@@ -322,12 +324,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         }
         return startPosition;
     }
-    
-	protected EntryPosition findEndPosition(ErosaConnection connection) throws IOException {
-		MysqlConnection mysqlConnection = (MysqlConnection) connection;
-		 EntryPosition endPosition = findEndPosition(mysqlConnection);
-		return endPosition;
-	}
+
+    protected EntryPosition findEndPosition(ErosaConnection connection) throws IOException {
+        MysqlConnection mysqlConnection = (MysqlConnection) connection;
+        EntryPosition endPosition = findEndPosition(mysqlConnection);
+        return endPosition;
+    }
 
     protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
         MysqlConnection mysqlConnection = (MysqlConnection) connection;
@@ -389,6 +391,24 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
             }
         } else {
             if (logPosition.getIdentity().getSourceAddress().equals(mysqlConnection.getConnector().getAddress())) {
+                if (dumpErrorCountThreshold >= 0 && dumpErrorCount > dumpErrorCountThreshold) {
+                    // binlog定位位点失败,可能有两个原因:
+                    // 1. binlog位点被删除
+                    // 2.vip模式的mysql,发生了主备切换,判断一下serverId是否变化,针对这种模式可以发起一次基于时间戳查找合适的binlog位点
+                    boolean case2 = standbyInfo == null && logPosition.getPostion().getServerId() != null
+                                    && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
+                    if (case2) {
+                        long timestamp = logPosition.getPostion().getTimestamp();
+                        long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
+                        logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "", "",
+                                logPosition.getPostion().getTimestamp() });
+                        EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
+                        // 重新置为一下
+                        dumpErrorCount = 0;
+                        return findPosition;
+                    }
+                }
+                // 其余情况
                 logger.warn("prepare to find start position just last position");
                 return logPosition.getPostion();
             } else {
@@ -431,7 +451,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                     }
                 } catch (Exception e) {
                     // 上一次记录的poistion可能为一条update/insert/delete变更事件,直接进行dump的话,会缺少tableMap事件,导致tableId未进行解析
-                    processError(e, lastPosition, entryPosition.getJournalName(), entryPosition.getPosition());
+                    processSinkError(e, lastPosition, entryPosition.getJournalName(), entryPosition.getPosition());
                     reDump.set(true);
                     return false;
                 }
@@ -465,7 +485,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
 
                         lastPosition = buildLastPosition(entry);
                     } catch (Exception e) {
-                        processError(e, lastPosition, entryPosition.getJournalName(), entryPosition.getPosition());
+                        processSinkError(e, lastPosition, entryPosition.getJournalName(), entryPosition.getPosition());
                         return false;
                     }
 
@@ -545,6 +565,22 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         return null;
     }
 
+    /**
+     * 查询当前db的serverId信息
+     */
+    private Long findServerId(MysqlConnection mysqlConnection) {
+        try {
+            ResultSetPacket packet = mysqlConnection.query("show variables like 'server_id'");
+            List<String> fields = packet.getFieldValues();
+            if (CollectionUtils.isEmpty(fields)) {
+                throw new CanalParseException("command : show variables like 'server_id' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
+            }
+            return Long.valueOf(fields.get(1));
+        } catch (IOException e) {
+            throw new CanalParseException("command : show variables like 'server_id' has an error!", e);
+        }
+    }
+
     /**
      * 查询当前的binlog位置
      */
@@ -686,8 +722,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         }
 
                         lastPosition = buildLastPosition(entry);
-                    } catch (Exception e) {
-                        processError(e, lastPosition, searchBinlogFile, 4L);
+                    } catch (Throwable e) {
+                        processSinkError(e, lastPosition, searchBinlogFile, 4L);
                     }
 
                     return running;
@@ -705,6 +741,19 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         }
     }
 
+    @Override
+    protected void processDumpError(Throwable e) {
+        if (e instanceof IOException) {
+            String message = e.getMessage();
+            if (StringUtils.contains(message, "errno = 1236")) {
+                // 1236 errorCode代表ER_MASTER_FATAL_ERROR_READING_BINLOG
+                dumpErrorCount++;
+            }
+        }
+
+        super.processDumpError(e);
+    }
+
     public void setSupportBinlogFormats(String formatStrs) {
         String[] formats = StringUtils.split(formatStrs, ',');
         if (formats != null) {
@@ -788,4 +837,9 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     public void setHaController(CanalHAController haController) {
         this.haController = haController;
     }
+
+    public void setDumpErrorCountThreshold(int dumpErrorCountThreshold) {
+        this.dumpErrorCountThreshold = dumpErrorCountThreshold;
+    }
+
 }

+ 15 - 0
protocol/src/main/java/com/alibaba/otter/canal/protocol/position/EntryPosition.java

@@ -15,6 +15,8 @@ public class EntryPosition extends TimePosition {
     private boolean           included              = false;
     private String            journalName;
     private Long              position;
+    // add by agapple at 2016-06-28
+    private Long              serverId              = null;              // 记录一下位点对应的serverId
 
     public EntryPosition(){
         super(null);
@@ -34,6 +36,11 @@ public class EntryPosition extends TimePosition {
         this.position = position;
     }
 
+    public EntryPosition(String journalName, Long position, Long timestamp, Long serverId){
+        this(journalName, position, timestamp);
+        this.serverId = serverId;
+    }
+
     public String getJournalName() {
         return journalName;
     }
@@ -58,6 +65,14 @@ public class EntryPosition extends TimePosition {
         this.included = included;
     }
 
+    public Long getServerId() {
+        return serverId;
+    }
+
+    public void setServerId(Long serverId) {
+        this.serverId = serverId;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;

+ 2 - 0
store/src/main/java/com/alibaba/otter/canal/store/helper/CanalEventUtils.java

@@ -51,6 +51,8 @@ public class CanalEventUtils {
         position.setJournalName(event.getEntry().getHeader().getLogfileName());
         position.setPosition(event.getEntry().getHeader().getLogfileOffset());
         position.setTimestamp(event.getEntry().getHeader().getExecuteTime());
+        // add serverId at 2016-06-28
+        position.setServerId(event.getEntry().getHeader().getServerId());
 
         LogPosition logPosition = new LogPosition();
         logPosition.setPostion(position);