Browse Source

IOException限制retry次数,超过次数后跳过binlog解析

Yishun.Chen 9 years ago
parent
commit
b599846470

+ 32 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -87,12 +87,22 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     protected volatile Timer                         timer;
     protected TimerTask                              heartBeatTimerTask;
     protected Throwable                              exception                  = null;
+    
+    //update by yishun.chen
+    //特殊异常处理参数
+    protected int                                    specialExceptionCount = 0;//特殊异常计数
+    protected int                                    specialExceptionCountThreshold = 3;//特殊异常计数阀值
+    protected boolean								 isFindEndPosition = false;//重连时查找数据库最新的位点
 
     protected abstract BinlogParser buildParser();
 
     protected abstract ErosaConnection buildErosaConnection();
 
     protected abstract EntryPosition findStartPosition(ErosaConnection connection) throws IOException;
+    
+    //update by yishun.chen
+    //查找数据库最新的位点
+    protected abstract EntryPosition findEndPosition(ErosaConnection connection) throws IOException;
 
     protected void preDump(ErosaConnection connection) {
     }
@@ -159,7 +169,19 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
 
                         erosaConnection.connect();// 链接
                         // 4. 获取最后的位置信息
-                        final EntryPosition startPosition = findStartPosition(erosaConnection);
+                        EntryPosition position = findStartPosition(erosaConnection);
+                        //update by yishun.chen
+                        //获取当前数据库最后一个位置进行消费
+                        //当解析binlog多次失败后跳过当前失败的位点重新到数据库获取新的位点 
+                        if(isFindEndPosition){
+                        	position = findEndPosition(erosaConnection);
+                        	specialExceptionCount = 0;
+                        	isFindEndPosition = false;
+                        	logger.error("special exception count>" + specialExceptionCountThreshold + ", find end position, maybe cause data loss!");
+                        	sendAlarm(destination, "special exception count>" + specialExceptionCountThreshold + ", find end position, maybe cause data loss!");
+                        }
+                        
+                        final EntryPosition startPosition = position;
                         if (startPosition == null) {
                             throw new CanalParseException("can't find start position for " + destination);
                         }
@@ -219,6 +241,15 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         logger.error(String.format("dump address %s has an error, retrying. caused by ",
                             runningInfo.getAddress().toString()), e);
                     } catch (Throwable e) {
+                    	//update by yishun.chen
+                    	//dump数据出现IOException累计错误出现3次以上跳过当前位点
+                    	if(e instanceof IOException){
+                    		specialExceptionCount ++;
+                    		if(specialExceptionCount >= specialExceptionCountThreshold){
+                    			isFindEndPosition = true;
+                    		}
+                    	}
+                    	
                         exception = e;
                         if (!running) {
                             if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) {