|
@@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory;
|
|
import org.slf4j.MDC;
|
|
import org.slf4j.MDC;
|
|
|
|
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.Objects;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
@@ -106,7 +107,10 @@ public class CanalMQStarter {
|
|
|
|
|
|
public synchronized void startDestination(String destination) {
|
|
public synchronized void startDestination(String destination) {
|
|
CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
|
|
CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
|
|
- if (canalInstance != null) {
|
|
|
|
|
|
+ CanalMQRunnable canalMQRunable = canalMQWorks.get(destination);
|
|
|
|
+ //当zk出现session time out可能会导致instance出现多个消费者,从而出现乱序
|
|
|
|
+ //因此当抢占成功的实例已经启动CanalMQRunnable时就不需要stop然后start了
|
|
|
|
+ if (canalInstance != null && Objects.isNull(canalMQRunable)) {
|
|
stopDestination(destination);
|
|
stopDestination(destination);
|
|
CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
|
|
CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
|
|
canalMQWorks.put(canalInstance.getDestination(), canalMQRunnable);
|
|
canalMQWorks.put(canalInstance.getDestination(), canalMQRunnable);
|