Browse Source

Merge pull request #26 from alibaba/master

merge
rewerma 6 years ago
parent
commit
ac738dae05

+ 13 - 0
.github/issue_template.md

@@ -0,0 +1,13 @@
+### environment
+
+* canal version
+* mysql version
+
+### Issue Description
+
+
+### Steps to reproduce
+
+### Expected behaviour
+
+### Actual behaviour

+ 3 - 2
client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterNodeAccessStrategy.java

@@ -15,7 +15,6 @@ import com.alibaba.otter.canal.common.utils.JsonUtils;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
 import com.alibaba.otter.canal.common.zookeeper.running.ServerRunningData;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 
 /**
  * 集群模式的调度策略
@@ -25,6 +24,7 @@ import com.alibaba.otter.canal.protocol.exception.CanalClientException;
  */
 public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
 
+    private String                           destination;
     private IZkChildListener                 childListener;                                      // 监听所有的服务器列表
     private IZkDataListener                  dataListener;                                       // 监听当前的工作节点
     private ZkClientx                        zkClient;
@@ -32,6 +32,7 @@ public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
     private volatile InetSocketAddress       runningAddress = null;
 
     public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){
+        this.destination = destination;
         this.zkClient = zkClient;
         childListener = new IZkChildListener() {
 
@@ -73,7 +74,7 @@ public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
         } else if (!currentAddress.isEmpty()) { // 如果不存在已经启动的服务,可能服务是一种lazy启动,随机选择一台触发服务器进行启动
             return currentAddress.get(0);// 默认返回第一个节点,之前已经做过shuffle
         } else {
-            throw new CanalClientException("no alive canal server");
+            throw new ServerNotFoundException("no alive canal server for " + destination);
         }
     }
 

+ 29 - 0
client/src/main/java/com/alibaba/otter/canal/client/impl/ServerNotFoundException.java

@@ -0,0 +1,29 @@
+package com.alibaba.otter.canal.client.impl;
+
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+
+public class ServerNotFoundException extends CanalClientException {
+
+    private static final long serialVersionUID = -3471518241911601774L;
+
+    public ServerNotFoundException(String errorCode, String errorDesc, Throwable cause){
+        super(errorCode, errorDesc, cause);
+    }
+
+    public ServerNotFoundException(String errorCode, String errorDesc){
+        super(errorCode, errorDesc);
+    }
+
+    public ServerNotFoundException(String errorCode, Throwable cause){
+        super(errorCode, cause);
+    }
+
+    public ServerNotFoundException(String errorCode){
+        super(errorCode);
+    }
+
+    public ServerNotFoundException(Throwable cause){
+        super(cause);
+    }
+
+}

+ 10 - 0
client/src/main/java/com/alibaba/otter/canal/client/impl/running/ClientRunningMonitor.java

@@ -16,6 +16,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import com.alibaba.otter.canal.client.impl.ServerNotFoundException;
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.common.utils.BooleanMutex;
 import com.alibaba.otter.canal.common.utils.JsonUtils;
@@ -141,6 +142,15 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
             logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",
                 destination),
                 t);
+
+            // fixed issue 1220, 针对server节点不工作避免死循环
+            if (t instanceof ServerNotFoundException) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                }
+            }
+
             // 出现任何异常尝试release
             releaseRunning();
             throw new CanalClientException("something goes wrong in initRunning method. ", t);

+ 4 - 0
filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterRegexFilter.java

@@ -125,4 +125,8 @@ public class AviaterRegexFilter implements CanalEventFilter<String> {
         return result;
     }
 
+    @Override
+    public String toString() {
+        return pattern;
+    }
 }

+ 24 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -11,6 +11,7 @@ import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
 import com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DefaultTableMetaTSDBFactory;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBFactory;
@@ -64,8 +65,29 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         super.setEventFilter(eventFilter);
 
         // 触发一下filter变更
-        if (eventFilter != null && eventFilter instanceof AviaterRegexFilter && binlogParser instanceof LogEventConvert) {
-            ((LogEventConvert) binlogParser).setNameFilter((AviaterRegexFilter) eventFilter);
+        if (eventFilter != null && eventFilter instanceof AviaterRegexFilter) {
+            if (binlogParser instanceof LogEventConvert) {
+                ((LogEventConvert) binlogParser).setNameFilter((AviaterRegexFilter) eventFilter);
+            }
+
+            if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
+                ((DatabaseTableMeta) tableMetaTSDB).setFilter(eventFilter);
+            }
+        }
+    }
+
+    public void setEventBlackFilter(CanalEventFilter eventBlackFilter) {
+        super.setEventBlackFilter(eventBlackFilter);
+
+        // 触发一下filter变更
+        if (eventBlackFilter != null && eventBlackFilter instanceof AviaterRegexFilter) {
+            if (binlogParser instanceof LogEventConvert) {
+                ((LogEventConvert) binlogParser).setNameBlackFilter((AviaterRegexFilter) eventBlackFilter);
+            }
+
+            if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
+                ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
+            }
         }
     }
 
@@ -115,16 +137,6 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         super.stop();
     }
 
-    public void setEventBlackFilter(CanalEventFilter eventBlackFilter) {
-        super.setEventBlackFilter(eventBlackFilter);
-
-        // 触发一下filter变更
-        if (eventBlackFilter != null && eventBlackFilter instanceof AviaterRegexFilter
-            && binlogParser instanceof LogEventConvert) {
-            ((LogEventConvert) binlogParser).setNameBlackFilter((AviaterRegexFilter) eventBlackFilter);
-        }
-    }
-
     protected MultiStageCoprocessor buildMultiStageCoprocessor() {
         MysqlMultiStageCoprocessor mysqlMultiStageCoprocessor = new MysqlMultiStageCoprocessor(parallelBufferSize,
             parallelThreadSize,

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -506,8 +506,8 @@ public class MysqlConnection implements ErosaConnection {
         ResultSetPacket rs = null;
         try {
             rs = query("select @@global.binlog_checksum");
-        } catch (IOException e) {
-            throw new CanalParseException(e);
+        } catch (Throwable e) {
+            // ignore
         }
 
         List<String> columnValues = rs.getFieldValues();

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -974,10 +974,12 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     public void setNameFilter(AviaterRegexFilter nameFilter) {
         this.nameFilter = nameFilter;
+        logger.warn("--> init table filter : " + nameFilter.toString());
     }
 
     public void setNameBlackFilter(AviaterRegexFilter nameBlackFilter) {
         this.nameBlackFilter = nameBlackFilter;
+        logger.warn("--> init table black filter : " + nameBlackFilter.toString());
     }
 
     public void setTableMetaCache(TableMetaCache tableMetaCache) {

+ 2 - 0
pom.xml

@@ -381,6 +381,7 @@
                 </configuration>
             </plugin>
             <!-- javadoc -->
+            <!--
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-javadoc-plugin</artifactId>
@@ -399,6 +400,7 @@
                   <additionalparam>-Xdoclint:none</additionalparam>
                 </configuration>
             </plugin>
+            -->
         </plugins>
         <sourceDirectory>src/main/java</sourceDirectory>
         <testSourceDirectory>src/test/java</testSourceDirectory>