瀏覽代碼

AbstractCanalInstance的subscribeChange方法,增加对AbstractEventParser的类型判断;MysqlEventParser对Gtid的处理需要考虑兼容性 (#1824)

* AbstractCanalInstance的subscribeChange方法,增加对AbstractEventParser的类型判断,以支持用户的自定义CanalEventParser

* MysqlEventParser对Gtid模式的处理需要考虑兼容性,支持系统平滑的从非gtid模式升级为gtid模式
lulu2panpan 6 年之前
父節點
當前提交
e808c6ec70

+ 6 - 2
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java

@@ -53,10 +53,14 @@ public class AbstractCanalInstance extends AbstractCanalLifeCycle implements Can
                 // 处理group的模式
                 List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
                 for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
-                    ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
+                    if(singleEventParser instanceof AbstractEventParser) {
+                        ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
+                    }
                 }
             } else {
-                ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
+                if(eventParser instanceof AbstractEventParser) {
+                    ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
+                }
             }
 
         }

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -242,7 +242,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         if (parallel) {
                             // build stage processor
                             multiStageCoprocessor = buildMultiStageCoprocessor();
-                            if (isGTIDMode()) {
+                            if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) {
                                 // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
                                 GTIDSet gtidSet = MysqlGTIDSet.parse(startPosition.getGtid());
                                 ((MysqlMultiStageCoprocessor) multiStageCoprocessor).setGtidSet(gtidSet);
@@ -260,7 +260,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                                 }
                             }
                         } else {
-                            if (isGTIDMode()) {
+                            if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) {
                                 // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
                                 erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
                             } else {

+ 8 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -355,11 +355,14 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
             // GTID模式下,CanalLogPositionManager里取最后的gtid,没有则取instanc配置中的
             LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination);
             if (logPosition != null) {
-                return logPosition.getPostion();
-            }
-
-            if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) {
-                return masterPosition;
+                // 如果以前是非GTID模式,后来调整为了GTID模式,那么为了保持兼容,需要判断gtid是否为空
+                if (StringUtils.isNotEmpty(logPosition.getPostion().getGtid())) {
+                    return logPosition.getPostion();
+                }
+            }else {
+                if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) {
+                    return masterPosition;
+                }
             }
         }