Browse Source

1、SessionHandler类中执行SUBSCRIPTION时,应该先尝试启动runningMonitor再调用embeddedServer的subscribe方法
2、embeddedServer的subscribe方法中应该首先进行checkStart操作,否则可能引起metaManager的脏启动
3、SessionHandler中增加日志

lubiao 8 years ago
parent
commit
8626c6ee24

+ 2 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -13,6 +13,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.CollectionUtils;
 
 
+import com.alibaba.otter.canal.common.utils.JsonUtils;
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.CanalHASwitchable;
 import com.alibaba.otter.canal.parse.CanalHASwitchable;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
@@ -410,7 +411,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                     }
                     }
                 }
                 }
                 // 其余情况
                 // 其余情况
-                logger.warn("prepare to find start position just last position");
+                logger.warn("prepare to find start position just last position\n {}",JsonUtils.marshalToString(logPosition));
                 return logPosition.getPostion();
                 return logPosition.getPostion();
             } else {
             } else {
                 // 针对切换的情况,考虑回退时间
                 // 针对切换的情况,考虑回退时间

+ 2 - 0
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -131,6 +131,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
      */
      */
     @Override
     @Override
     public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
     public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
+    	checkStart(clientIdentity.getDestination());
+    	
         CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
         CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
         if (!canalInstance.getMetaManager().isStart()) {
         if (!canalInstance.getMetaManager().isStart()) {
             canalInstance.getMetaManager().start();
             canalInstance.getMetaManager().start();

+ 3 - 2
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -66,8 +66,7 @@ public class SessionHandler extends SimpleChannelHandler {
                             Short.valueOf(sub.getClientId()),
                             Short.valueOf(sub.getClientId()),
                             sub.getFilter());
                             sub.getFilter());
                         MDC.put("destination", clientIdentity.getDestination());
                         MDC.put("destination", clientIdentity.getDestination());
-                        embeddedServer.subscribe(clientIdentity);
-
+                        
                         // 尝试启动,如果已经启动,忽略
                         // 尝试启动,如果已经启动,忽略
                         if (!embeddedServer.isStart(clientIdentity.getDestination())) {
                         if (!embeddedServer.isStart(clientIdentity.getDestination())) {
                             ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
                             ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
@@ -75,6 +74,8 @@ public class SessionHandler extends SimpleChannelHandler {
                                 runningMonitor.start();
                                 runningMonitor.start();
                             }
                             }
                         }
                         }
+                        
+                        embeddedServer.subscribe(clientIdentity);
 
 
                         ctx.setAttachment(clientIdentity);// 设置状态数据
                         ctx.setAttachment(clientIdentity);// 设置状态数据
                         NettyUtils.ack(ctx.getChannel(), null);
                         NettyUtils.ack(ctx.getChannel(), null);