소스 검색

Fix: 主备切换后没有往Kafka发送数据 (#1810)

发生主备切换时,调用canalMQStarter启停对应的destination。
soso 6 년 전
부모
커밋
8df6d6fed8
1개의 변경된 파일6개의 추가작업 그리고 6개의 파일을 삭제
  1. 6 6
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

+ 6 - 6
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -152,6 +152,9 @@ public class CanalController {
                             try {
                                 MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
                                 embededCanalServer.start(destination);
+                                if (canalMQStarter != null) {
+                                    canalMQStarter.startDestination(destination);
+                                }
                             } finally {
                                 MDC.remove(CanalConstants.MDC_DESTINATION);
                             }
@@ -160,6 +163,9 @@ public class CanalController {
                         public void processActiveExit() {
                             try {
                                 MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                                if (canalMQStarter != null) {
+                                    canalMQStarter.stopDestination(destination);
+                                }
                                 embededCanalServer.stop(destination);
                             } finally {
                                 MDC.remove(CanalConstants.MDC_DESTINATION);
@@ -234,9 +240,6 @@ public class CanalController {
                         ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                         if (!config.getLazy() && !runningMonitor.isStart()) {
                             runningMonitor.start();
-                            if (canalMQStarter != null) {
-                                canalMQStarter.startDestination(destination);
-                            }
                         }
                     }
                 }
@@ -245,9 +248,6 @@ public class CanalController {
                     // 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息
                     InstanceConfig config = instanceConfigs.remove(destination);
                     if (config != null) {
-                        if (canalMQStarter != null) {
-                            canalMQStarter.stopDestination(destination);
-                        }
                         embededCanalServer.stop(destination);
                         ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                         if (runningMonitor.isStart()) {