Browse Source

Merge pull request #138 from MarkLinHz/master

make canal server become a single instance
agapple 9 năm trước cách đây
mục cha
commit
37478f3409

+ 2 - 2
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -88,9 +88,9 @@ public class CanalController {
         cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
         ip = getProperty(properties, CanalConstants.CANAL_IP);
         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
-        embededCanalServer = new CanalServerWithEmbedded();
+        embededCanalServer = CanalServerWithEmbedded.instance();
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
-        canalServer = new CanalServerWithNetty(embededCanalServer);
+        canalServer = CanalServerWithNetty.instance();
         canalServer.setIp(ip);
         canalServer.setPort(port);
 

+ 247 - 0
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java

@@ -0,0 +1,247 @@
+package com.alibaba.otter.canal.instance.core;
+
+import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
+import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler;
+import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
+import com.alibaba.otter.canal.meta.CanalMetaManager;
+import com.alibaba.otter.canal.parse.CanalEventParser;
+import com.alibaba.otter.canal.parse.ha.CanalHAController;
+import com.alibaba.otter.canal.parse.ha.HeartBeatHAController;
+import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
+import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.ClientIdentity;
+import com.alibaba.otter.canal.sink.CanalEventSink;
+import com.alibaba.otter.canal.store.CanalEventStore;
+import com.alibaba.otter.canal.store.model.Event;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created with Intellig IDEA.
+ * Author: yinxiu
+ * Date: 2016-01-07
+ * Time: 22:26
+ */
+public class AbstractCanalInstance extends AbstractCanalLifeCycle implements CanalInstance {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractCanalInstance.class);
+
+    protected Long                                   canalId;                               // 和manager交互唯一标示
+    protected String                                 destination;                           // 队列名字
+    protected CanalEventStore<Event>                 eventStore;                            // 有序队列
+
+    protected CanalEventParser                       eventParser;                           // 解析对应的数据信息
+    protected CanalEventSink<List<CanalEntry.Entry>> eventSink;                             // 链接parse和store的桥接器
+    protected CanalMetaManager                       metaManager;                           // 消费信息管理器
+    protected CanalAlarmHandler                      alarmHandler;                          // alarm报警机制
+
+    @Override
+    public boolean subscribeChange(ClientIdentity identity) {
+        if (StringUtils.isNotEmpty(identity.getFilter())) {
+            logger.info("subscribe filter change to " + identity.getFilter());
+            AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());
+
+            boolean isGroup = (eventParser instanceof GroupEventParser);
+            if (isGroup) {
+                // 处理group的模式
+                List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
+                for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
+                    ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
+                }
+            } else {
+                ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
+            }
+
+        }
+
+        // filter的处理规则
+        // a. parser处理数据过滤处理
+        // b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据
+        // 后续内存版的一对多分发,可以考虑
+        return true;
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        if (!metaManager.isStart()) {
+            metaManager.start();
+        }
+
+        if (!alarmHandler.isStart()) {
+            alarmHandler.start();
+        }
+
+        if (!eventStore.isStart()) {
+            eventStore.start();
+        }
+
+        if (!eventSink.isStart()) {
+            eventSink.start();
+        }
+
+        if (!eventParser.isStart()) {
+            beforeStartEventParser(eventParser);
+            eventParser.start();
+            afterStartEventParser(eventParser);
+        }
+        logger.info("start successful....");
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+        logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination });
+
+        if (eventParser.isStart()) {
+            beforeStopEventParser(eventParser);
+            eventParser.stop();
+            afterStopEventParser(eventParser);
+        }
+
+        if (eventSink.isStart()) {
+            eventSink.stop();
+        }
+
+        if (eventStore.isStart()) {
+            eventStore.stop();
+        }
+
+        if (metaManager.isStart()) {
+            metaManager.stop();
+        }
+
+        if (alarmHandler.isStart()) {
+            alarmHandler.stop();
+        }
+
+
+        logger.info("stop successful....");
+    }
+
+    protected void beforeStartEventParser(CanalEventParser eventParser) {
+
+        boolean isGroup = (eventParser instanceof GroupEventParser);
+        if (isGroup) {
+            // 处理group的模式
+            List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
+            for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
+                startEventParserInternal(singleEventParser, true);
+            }
+        } else {
+            startEventParserInternal(eventParser, false);
+        }
+    }
+
+    // around event parser, default impl
+    protected void afterStartEventParser(CanalEventParser eventParser) {
+        // 读取一下历史订阅的filter信息
+        List<ClientIdentity> clientIdentitys = metaManager.listAllSubscribeInfo(destination);
+        for (ClientIdentity clientIdentity : clientIdentitys) {
+            subscribeChange(clientIdentity);
+        }
+    }
+
+    // around event parser
+    protected void beforeStopEventParser(CanalEventParser eventParser) {
+        // noop
+    }
+
+    protected void afterStopEventParser(CanalEventParser eventParser) {
+
+        boolean isGroup = (eventParser instanceof GroupEventParser);
+        if (isGroup) {
+            // 处理group的模式
+            List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
+            for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
+                stopEventParserInternal(singleEventParser);
+            }
+        } else {
+            stopEventParserInternal(eventParser);
+        }
+    }
+
+    /**
+     * 初始化单个eventParser,不需要考虑group
+     */
+    protected void startEventParserInternal(CanalEventParser eventParser, boolean isGroup) {
+        if (eventParser instanceof AbstractEventParser) {
+            AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
+            // 首先启动log position管理器
+            CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
+            if (!logPositionManager.isStart()) {
+                logPositionManager.start();
+            }
+        }
+
+        if (eventParser instanceof MysqlEventParser) {
+            MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
+            CanalHAController haController = mysqlEventParser.getHaController();
+
+            if (haController instanceof HeartBeatHAController) {
+                ((HeartBeatHAController) haController).setCanalHASwitchable(mysqlEventParser);
+            }
+
+            if (!haController.isStart()) {
+                haController.start();
+            }
+
+        }
+    }
+
+    protected void stopEventParserInternal(CanalEventParser eventParser) {
+        if (eventParser instanceof AbstractEventParser) {
+            AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
+            // 首先启动log position管理器
+            CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
+            if (logPositionManager.isStart()) {
+                logPositionManager.stop();
+            }
+        }
+
+        if (eventParser instanceof MysqlEventParser) {
+            MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
+            CanalHAController haController = mysqlEventParser.getHaController();
+            if (haController.isStart()) {
+                haController.stop();
+            }
+        }
+    }
+
+    // ==================getter==================================
+    @Override
+    public String getDestination() {
+        return destination;
+    }
+
+    @Override
+    public CanalEventParser getEventParser() {
+        return eventParser;
+    }
+
+    @Override
+    public CanalEventSink getEventSink() {
+        return eventSink;
+    }
+
+    @Override
+    public CanalEventStore getEventStore() {
+        return eventStore;
+    }
+
+    @Override
+    public CanalMetaManager getMetaManager() {
+        return metaManager;
+    }
+
+    @Override
+    public CanalAlarmHandler getAlarmHandler() {
+        return alarmHandler;
+    }
+}

+ 0 - 105
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstanceSupport.java

@@ -1,105 +0,0 @@
-package com.alibaba.otter.canal.instance.core;
-
-import java.util.List;
-
-import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
-import com.alibaba.otter.canal.parse.CanalEventParser;
-import com.alibaba.otter.canal.parse.ha.CanalHAController;
-import com.alibaba.otter.canal.parse.ha.HeartBeatHAController;
-import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
-import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
-import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
-
-/**
- * @author zebin.xuzb 2012-10-17 下午3:12:34
- * @version 1.0.0
- */
-public abstract class CanalInstanceSupport extends AbstractCanalLifeCycle {
-
-    protected void beforeStartEventParser(CanalEventParser eventParser) {
-
-        boolean isGroup = (eventParser instanceof GroupEventParser);
-        if (isGroup) {
-            // 处理group的模式
-            List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
-            for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
-                startEventParserInternal(singleEventParser, true);
-            }
-        } else {
-            startEventParserInternal(eventParser, false);
-        }
-    }
-
-    // around event parser
-    protected void afterStartEventParser(CanalEventParser eventParser) {
-        // noop
-    }
-
-    // around event parser
-    protected void beforeStopEventParser(CanalEventParser eventParser) {
-        // noop
-    }
-
-    protected void afterStopEventParser(CanalEventParser eventParser) {
-
-        boolean isGroup = (eventParser instanceof GroupEventParser);
-        if (isGroup) {
-            // 处理group的模式
-            List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
-            for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
-                stopEventParserInternal(singleEventParser);
-            }
-        } else {
-            stopEventParserInternal(eventParser);
-        }
-    }
-
-    /**
-     * 初始化单个eventParser,不需要考虑group
-     */
-    protected void startEventParserInternal(CanalEventParser eventParser, boolean isGroup) {
-        if (eventParser instanceof AbstractEventParser) {
-            AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
-            // 首先启动log position管理器
-            CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
-            if (!logPositionManager.isStart()) {
-                logPositionManager.start();
-            }
-        }
-
-        if (eventParser instanceof MysqlEventParser) {
-            MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
-            CanalHAController haController = mysqlEventParser.getHaController();
-
-            if (haController instanceof HeartBeatHAController) {
-                ((HeartBeatHAController) haController).setCanalHASwitchable(mysqlEventParser);
-            }
-
-            if (!haController.isStart()) {
-                haController.start();
-            }
-
-        }
-    }
-
-    protected void stopEventParserInternal(CanalEventParser eventParser) {
-        if (eventParser instanceof AbstractEventParser) {
-            AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
-            // 首先启动log position管理器
-            CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
-            if (logPositionManager.isStart()) {
-                logPositionManager.stop();
-            }
-        }
-
-        if (eventParser instanceof MysqlEventParser) {
-            MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
-            CanalHAController haController = mysqlEventParser.getHaController();
-            if (haController.isStart()) {
-                haController.stop();
-            }
-        }
-    }
-
-}

+ 5 - 128
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -6,6 +6,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import com.alibaba.otter.canal.instance.core.AbstractCanalInstance;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -17,8 +18,6 @@ import com.alibaba.otter.canal.common.alarm.LogAlarmHandler;
 import com.alibaba.otter.canal.common.utils.JsonUtils;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
-import com.alibaba.otter.canal.instance.core.CanalInstance;
-import com.alibaba.otter.canal.instance.core.CanalInstanceSupport;
 import com.alibaba.otter.canal.instance.manager.model.Canal;
 import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
 import com.alibaba.otter.canal.instance.manager.model.CanalParameter.DataSourcing;
@@ -47,7 +46,6 @@ import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
 import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
-import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.sink.CanalEventSink;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;
@@ -64,10 +62,9 @@ import com.alibaba.otter.canal.store.model.Event;
  * @author jianghang 2012-7-11 下午09:26:51
  * @version 1.0.0
  */
-public class CanalInstanceWithManager extends CanalInstanceSupport implements CanalInstance {
+public class CanalInstanceWithManager extends AbstractCanalInstance {
 
     private static final Logger           logger = LoggerFactory.getLogger(CanalInstanceWithManager.class);
-    protected Long                        canalId;                                                         // 和manager交互唯一标示
     protected String                      destination;                                                     // 队列名字
     protected String                      filter;                                                          // 过滤表达式
     protected CanalParameter              parameters;                                                      // 对应参数
@@ -77,11 +74,6 @@ public class CanalInstanceWithManager extends CanalInstanceSupport implements Ca
     protected CanalEventParser            eventParser;                                                     // 解析对应的数据信息
     protected CanalEventSink<List<Entry>> eventSink;                                                       // 链接parse和store的桥接器
     protected CanalAlarmHandler           alarmHandler;                                                    // alarm报警机制
-    protected ZkClientx                   zkClientx;
-
-    public CanalInstanceWithManager(Canal canal){
-        this(canal, null);
-    }
 
     public CanalInstanceWithManager(Canal canal, String filter){
         this.parameters = canal.getCanalParameter();
@@ -89,7 +81,7 @@ public class CanalInstanceWithManager extends CanalInstanceSupport implements Ca
         this.destination = canal.getName();
         this.filter = filter;
 
-        logger.info("init CannalInstance for {}-{} with parameters:{}", canalId, destination, parameters);
+        logger.info("init CanalInstance for {}-{} with parameters:{}", canalId, destination, parameters);
         // 初始化报警机制
         initAlarmHandler();
         // 初始化metaManager
@@ -101,7 +93,7 @@ public class CanalInstanceWithManager extends CanalInstanceSupport implements Ca
         // 初始化eventParser;
         initEventParser();
 
-        // 基础工具,需要提前start,会有先订阅再根据filter条件启动paser的需求
+        // 基础工具,需要提前start,会有先订阅再根据filter条件启动parse的需求
         if (!alarmHandler.isStart()) {
             alarmHandler.start();
         }
@@ -113,98 +105,9 @@ public class CanalInstanceWithManager extends CanalInstanceSupport implements Ca
     }
 
     public void start() {
-        super.start();
         // 初始化metaManager
         logger.info("start CannalInstance for {}-{} with parameters:{}", canalId, destination, parameters);
-
-        if (!metaManager.isStart()) {
-            metaManager.start();
-        }
-
-        if (!alarmHandler.isStart()) {
-            alarmHandler.start();
-        }
-
-        if (!eventStore.isStart()) {
-            eventStore.start();
-        }
-
-        if (!eventSink.isStart()) {
-            eventSink.start();
-        }
-
-        if (!eventParser.isStart()) {
-            beforeStartEventParser(eventParser);
-            eventParser.start();
-        }
-
-        logger.info("start successful....");
-    }
-
-    public void stop() {
-        logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination });
-
-        if (eventParser.isStart()) {
-            eventParser.stop();
-            afterStopEventParser(eventParser);
-        }
-
-        if (eventSink.isStart()) {
-            eventSink.stop();
-        }
-
-        if (eventStore.isStart()) {
-            eventStore.stop();
-        }
-
-        if (metaManager.isStart()) {
-            metaManager.stop();
-        }
-
-        if (alarmHandler.isStart()) {
-            alarmHandler.stop();
-        }
-
-        // if (zkClientx != null) {
-        // zkClientx.close();
-        // }
-
-        super.stop();
-        logger.info("stop successful....");
-    }
-
-    public boolean subscribeChange(ClientIdentity identity) {
-        if (StringUtils.isNotEmpty(identity.getFilter())) {
-            AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());
-
-            boolean isGroup = (eventParser instanceof GroupEventParser);
-            if (isGroup) {
-                // 处理group的模式
-                List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
-                for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
-                    ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
-                }
-            } else {
-                ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
-            }
-
-        }
-
-        // filter的处理规则
-        // a. parser处理数据过滤处理
-        // b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据
-        // 后续内存版的一对多分发,可以考虑
-        return true;
-    }
-
-    protected void afterStartEventParser(CanalEventParser eventParser) {
-        super.afterStartEventParser(eventParser);
-
-        // 读取一下历史订阅的filter信息
-        List<ClientIdentity> clientIdentitys = metaManager.listAllSubscribeInfo(destination);
-        for (ClientIdentity clientIdentity : clientIdentitys) {
-            subscribeChange(clientIdentity);
-        }
+        super.start();
     }
 
     protected void initAlarmHandler() {
@@ -515,32 +418,6 @@ public class CanalInstanceWithManager extends CanalInstanceSupport implements Ca
         return ZkClientx.getZkClient(StringUtils.join(zkClusters, ";"));
     }
 
-    // =====================================
-
-    public String getDestination() {
-        return destination;
-    }
-
-    public CanalMetaManager getMetaManager() {
-        return metaManager;
-    }
-
-    public CanalEventStore<Event> getEventStore() {
-        return eventStore;
-    }
-
-    public CanalEventParser getEventParser() {
-        return eventParser;
-    }
-
-    public CanalEventSink<List<Entry>> getEventSink() {
-        return eventSink;
-    }
-
-    public CanalAlarmHandler getAlarmHandler() {
-        return alarmHandler;
-    }
-
     public void setAlarmHandler(CanalAlarmHandler alarmHandler) {
         this.alarmHandler = alarmHandler;
     }

+ 3 - 126
instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/CanalInstanceWithSpring.java

@@ -2,20 +2,14 @@ package com.alibaba.otter.canal.instance.spring;
 
 import java.util.List;
 
-import org.apache.commons.lang.StringUtils;
+import com.alibaba.otter.canal.instance.core.AbstractCanalInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler;
-import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
-import com.alibaba.otter.canal.instance.core.CanalInstance;
-import com.alibaba.otter.canal.instance.core.CanalInstanceSupport;
 import com.alibaba.otter.canal.meta.CanalMetaManager;
 import com.alibaba.otter.canal.parse.CanalEventParser;
-import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
-import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
 import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.sink.CanalEventSink;
 import com.alibaba.otter.canal.store.CanalEventStore;
 import com.alibaba.otter.canal.store.model.Event;
@@ -27,130 +21,13 @@ import com.alibaba.otter.canal.store.model.Event;
  * @author zebin.xuzb
  * @version 1.0.0
  */
-public class CanalInstanceWithSpring extends CanalInstanceSupport implements CanalInstance {
+public class CanalInstanceWithSpring extends AbstractCanalInstance {
 
     private static final Logger                    logger = LoggerFactory.getLogger(CanalInstanceWithSpring.class);
-    private String                                 destination;
-    private CanalEventParser                       eventParser;
-    private CanalEventSink<List<CanalEntry.Entry>> eventSink;
-    private CanalEventStore<Event>                 eventStore;
-    private CanalMetaManager                       metaManager;
-    private CanalAlarmHandler                      alarmHandler;
-
-    public String getDestination() {
-        return this.destination;
-    }
-
-    public CanalEventParser getEventParser() {
-        return this.eventParser;
-    }
-
-    public CanalEventSink<List<CanalEntry.Entry>> getEventSink() {
-        return this.eventSink;
-    }
-
-    public CanalEventStore<Event> getEventStore() {
-        return this.eventStore;
-    }
-
-    public CanalMetaManager getMetaManager() {
-        return this.metaManager;
-    }
-
-    public CanalAlarmHandler getAlarmHandler() {
-        return alarmHandler;
-    }
-
-    public boolean subscribeChange(ClientIdentity identity) {
-        if (StringUtils.isNotEmpty(identity.getFilter())) {
-            logger.info("subscribe filter change to " + identity.getFilter());
-            AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());
-
-            boolean isGroup = (eventParser instanceof GroupEventParser);
-            if (isGroup) {
-                // 处理group的模式
-                List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
-                for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
-                    ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
-                }
-            } else {
-                ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
-            }
-
-        }
-
-        // filter的处理规则
-        // a. parser处理数据过滤处理
-        // b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据
-        // 后续内存版的一对多分发,可以考虑
-        return true;
-    }
-
-    protected void afterStartEventParser(CanalEventParser eventParser) {
-        super.afterStartEventParser(eventParser);
-
-        // 读取一下历史订阅的filter信息
-        List<ClientIdentity> clientIdentitys = metaManager.listAllSubscribeInfo(destination);
-        for (ClientIdentity clientIdentity : clientIdentitys) {
-            subscribeChange(clientIdentity);
-        }
-    }
 
     public void start() {
-        super.start();
-
         logger.info("start CannalInstance for {}-{} ", new Object[] { 1, destination });
-        if (!metaManager.isStart()) {
-            metaManager.start();
-        }
-
-        if (!eventStore.isStart()) {
-            eventStore.start();
-        }
-
-        if (!eventSink.isStart()) {
-            eventSink.start();
-        }
-
-        if (!eventParser.isStart()) {
-            beforeStartEventParser(eventParser);
-            eventParser.start();
-            afterStartEventParser(eventParser);
-        }
-
-        logger.info("start successful....");
-    }
-
-    public void stop() {
-        logger.info("stop CannalInstance for {}-{} ", new Object[] { 1, destination });
-        if (eventParser.isStart()) {
-            beforeStopEventParser(eventParser);
-            eventParser.stop();
-            afterStopEventParser(eventParser);
-        }
-
-        if (eventSink.isStart()) {
-            eventSink.stop();
-        }
-
-        if (eventStore.isStart()) {
-            eventStore.stop();
-        }
-
-        if (metaManager.isStart()) {
-            metaManager.stop();
-        }
-
-        if (alarmHandler.isStart()) {
-            alarmHandler.stop();
-        }
-
-        // if (zkClientx != null) {
-        // zkClientx.close();
-        // }
-
-        super.stop();
-        logger.info("stop successful....");
+        super.start();
     }
 
     // ======== setter ========

+ 3 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -120,7 +120,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                     throw new CanalParseException("consume failed!");
                 }
 
-                LogPosition position = buildLastTranasctionPosition(transaction);
+                LogPosition position = buildLastTransactionPosition(transaction);
                 if (position != null) { // 可能position为空
                     logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
                 }
@@ -336,20 +336,11 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         return profilingEnabled.get();
     }
 
-    protected LogPosition buildLastTranasctionPosition(List<CanalEntry.Entry> entries) { // 初始化一下
+    protected LogPosition buildLastTransactionPosition(List<CanalEntry.Entry> entries) { // 初始化一下
         for (int i = entries.size() - 1; i > 0; i--) {
             CanalEntry.Entry entry = entries.get(i);
             if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {// 尽量记录一个事务做为position
-                LogPosition logPosition = new LogPosition();
-                EntryPosition position = new EntryPosition();
-                position.setJournalName(entry.getHeader().getLogfileName());
-                position.setPosition(entry.getHeader().getLogfileOffset());
-                position.setTimestamp(entry.getHeader().getExecuteTime());
-                logPosition.setPostion(position);
-
-                LogIdentity identity = new LogIdentity(runningInfo.getAddress(), -1L);
-                logPosition.setIdentity(identity);
-                return logPosition;
+                return buildLastPosition(entry);
             }
         }
 

+ 16 - 16
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -700,22 +700,22 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         }
     }
 
-    protected Entry parseAndProfilingIfNecessary(LogEvent bod) throws Exception {
-        long startTs = -1;
-        boolean enabled = getProfilingEnabled();
-        if (enabled) {
-            startTs = System.currentTimeMillis();
-        }
-        CanalEntry.Entry event = binlogParser.parse(bod);
-        if (enabled) {
-            this.parsingInterval = System.currentTimeMillis() - startTs;
-        }
-
-        if (parsedEventCount.incrementAndGet() < 0) {
-            parsedEventCount.set(0);
-        }
-        return event;
-    }
+//    protected Entry parseAndProfilingIfNecessary(LogEvent bod) throws Exception {
+//        long startTs = -1;
+//        boolean enabled = getProfilingEnabled();
+//        if (enabled) {
+//            startTs = System.currentTimeMillis();
+//        }
+//        CanalEntry.Entry event = binlogParser.parse(bod);
+//        if (enabled) {
+//            this.parsingInterval = System.currentTimeMillis() - startTs;
+//        }
+//
+//        if (parsedEventCount.incrementAndGet() < 0) {
+//            parsedEventCount.set(0);
+//        }
+//        return event;
+//    }
 
     public void setSupportBinlogFormats(String formatStrs) {
         String[] formats = StringUtils.split(formatStrs, ',');

+ 24 - 8
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -6,6 +6,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import com.alibaba.otter.canal.server.CanalService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -37,24 +38,39 @@ import com.google.common.collect.MigrateMap;
  * @author zebin.xuzb
  * @version 1.0.0
  */
-public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, com.alibaba.otter.canal.server.CanalService {
+public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {
 
     private static final Logger        logger = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
     private Map<String, CanalInstance> canalInstances;
     // private Map<ClientIdentity, Position> lastRollbackPostions;
     private CanalInstanceGenerator     canalInstanceGenerator;
 
+
+    private static class SingletonHolder {
+        private static final CanalServerWithEmbedded CANAL_SERVER_WITH_EMBEDDED = new CanalServerWithEmbedded();
+    }
+
+    private CanalServerWithEmbedded() {
+
+    }
+
+    public static CanalServerWithEmbedded instance() {
+        return SingletonHolder.CANAL_SERVER_WITH_EMBEDDED;
+    }
+
     public void start() {
-        super.start();
+        if (!isStart()) {
+            super.start();
 
-        canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {
+            canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {
 
-            public CanalInstance apply(String destination) {
-                return canalInstanceGenerator.generate(destination);
-            }
-        });
+                public CanalInstance apply(String destination) {
+                    return canalInstanceGenerator.generate(destination);
+                }
+            });
 
-        // lastRollbackPostions = new MapMaker().makeMap();
+            // lastRollbackPostions = new MapMaker().makeMap();
+        }
     }
 
     public void stop() {

+ 9 - 7
server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNetty.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.server.netty;
 import java.net.InetSocketAddress;
 import java.util.concurrent.Executors;
 
+import com.alibaba.otter.canal.instance.core.CanalInstance;
 import org.apache.commons.lang.StringUtils;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
@@ -33,11 +34,16 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
     private Channel                 serverChannel = null;
     private ServerBootstrap         bootstrap     = null;
 
-    public CanalServerWithNetty(){
+    private static class SingletonHolder {
+        private static final CanalServerWithNetty CANAL_SERVER_WITH_NETTY = new CanalServerWithNetty();
     }
 
-    public CanalServerWithNetty(CanalServerWithEmbedded embeddedServer){
-        this.embeddedServer = embeddedServer;
+    private CanalServerWithNetty(){
+        this.embeddedServer = CanalServerWithEmbedded.instance();
+    }
+
+    public static CanalServerWithNetty instance() {
+        return SingletonHolder.CANAL_SERVER_WITH_NETTY;
     }
 
     public void start() {
@@ -98,8 +104,4 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
         this.port = port;
     }
 
-    public void setEmbeddedServer(CanalServerWithEmbedded embeddedServer) {
-        this.embeddedServer = embeddedServer;
-    }
-
 }

+ 1 - 1
server/src/test/java/com/alibaba/otter/canal/server/BaseCanalServerWithEmbededTest.java

@@ -31,7 +31,7 @@ public abstract class BaseCanalServerWithEmbededTest {
 
     @Before
     public void setUp() {
-        server = new CanalServerWithEmbedded();
+        server = CanalServerWithEmbedded.instance();
         server.setCanalInstanceGenerator(new CanalInstanceGenerator() {
 
             public CanalInstance generate(String destination) {

+ 1 - 1
server/src/test/java/com/alibaba/otter/canal/server/CanalServerWithNettyTest.java

@@ -49,7 +49,7 @@ public class CanalServerWithNettyTest {
 
     @Before
     public void setUp() {
-        CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();
+        CanalServerWithEmbedded embeddedServer = CanalServerWithEmbedded.instance();
         embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
 
             public CanalInstance generate(String destination) {