Browse Source

修改mq配置针对destination级别重启

mcy 6 years ago
parent
commit
7d7c82ce91

+ 2 - 22
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -226,7 +226,7 @@ public class CanalController {
                         if (!config.getLazy() && !runningMonitor.isStart()) {
                         if (!config.getLazy() && !runningMonitor.isStart()) {
                             runningMonitor.start();
                             runningMonitor.start();
                             if (canalMQStarter != null) {
                             if (canalMQStarter != null) {
-                                canalMQStarter.start(getMQProperties(properties));
+                                canalMQStarter.startDestination(destination);
                             }
                             }
                         }
                         }
                     }
                     }
@@ -237,7 +237,7 @@ public class CanalController {
                     InstanceConfig config = instanceConfigs.remove(destination);
                     InstanceConfig config = instanceConfigs.remove(destination);
                     if (config != null) {
                     if (config != null) {
                         if (canalMQStarter != null) {
                         if (canalMQStarter != null) {
-                            canalMQStarter.stop();
+                            canalMQStarter.stopDestination(destination);
                         }
                         }
                         embededCanalServer.stop(destination);
                         embededCanalServer.stop(destination);
                         ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                         ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
@@ -533,24 +533,4 @@ public class CanalController {
     public void setCanalMQStarter(CanalMQStarter canalMQStarter) {
     public void setCanalMQStarter(CanalMQStarter canalMQStarter) {
         this.canalMQStarter = canalMQStarter;
         this.canalMQStarter = canalMQStarter;
     }
     }
-
-    public static MQProperties getMQProperties(Properties properties) {
-        MQProperties mqProperties = new MQProperties();
-        mqProperties.setServers(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS));
-        mqProperties
-            .setRetries(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_RETRIES)));
-        mqProperties
-            .setBatchSize(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BATCHSIZE)));
-        mqProperties
-            .setLingerMs(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_LINGERMS)));
-        mqProperties.setBufferMemory(
-            Long.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BUFFERMEMORY)));
-        mqProperties.setCanalBatchSize(
-            Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALBATCHSIZE)));
-        mqProperties.setCanalGetTimeout(
-            Long.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALGETTIMEOUT)));
-        mqProperties.setFlatMessage(
-            Boolean.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_FLATMESSAGE)));
-        return mqProperties;
-    }
 }
 }

+ 18 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.deployer;
 import java.io.FileInputStream;
 import java.io.FileInputStream;
 import java.util.Properties;
 import java.util.Properties;
 
 
+import com.alibaba.otter.canal.common.MQProperties;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -72,7 +73,23 @@ public class CanalLauncher {
 
 
             if (canalMQProducer != null) {
             if (canalMQProducer != null) {
                 CanalMQStarter canalMQStarter = new CanalMQStarter(canalMQProducer);
                 CanalMQStarter canalMQStarter = new CanalMQStarter(canalMQProducer);
-                canalMQStarter.start(CanalController.getMQProperties(properties));
+                MQProperties mqProperties = new MQProperties();
+                mqProperties.setServers(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS));
+                mqProperties
+                        .setRetries(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_RETRIES)));
+                mqProperties
+                        .setBatchSize(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BATCHSIZE)));
+                mqProperties
+                        .setLingerMs(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_LINGERMS)));
+                mqProperties.setBufferMemory(
+                        Long.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BUFFERMEMORY)));
+                mqProperties.setCanalBatchSize(
+                        Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALBATCHSIZE)));
+                mqProperties.setCanalGetTimeout(
+                        Long.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALGETTIMEOUT)));
+                mqProperties.setFlatMessage(
+                        Boolean.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_FLATMESSAGE)));
+                canalMQStarter.start(mqProperties);
                 controller.setCanalMQStarter(canalMQStarter);
                 controller.setCanalMQStarter(canalMQStarter);
             }
             }
         } catch (Throwable e) {
         } catch (Throwable e) {

+ 71 - 37
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -1,8 +1,11 @@
 package com.alibaba.otter.canal.server;
 package com.alibaba.otter.canal.server;
 
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -18,17 +21,19 @@ import com.alibaba.otter.canal.spi.CanalMQProducer;
 
 
 public class CanalMQStarter {
 public class CanalMQStarter {
 
 
-    private static final Logger     logger  = LoggerFactory.getLogger(CanalMQStarter.class);
+    private static final Logger          logger       = LoggerFactory.getLogger(CanalMQStarter.class);
 
 
-    private volatile boolean        running = false;
+    private volatile boolean             running      = false;
 
 
-    private ExecutorService         executorService;
+    private ExecutorService              executorService;
 
 
-    private CanalMQProducer         canalMQProducer;
+    private CanalMQProducer              canalMQProducer;
 
 
-    private MQProperties            properties;
+    private MQProperties                 properties;
 
 
-    private CanalServerWithEmbedded canalServer;
+    private CanalServerWithEmbedded      canalServer;
+
+    private Map<String, CanalMQRunnable> canalMQWorks = new ConcurrentHashMap<>();
 
 
     public CanalMQStarter(CanalMQProducer canalMQProducer){
     public CanalMQStarter(CanalMQProducer canalMQProducer){
         this.canalMQProducer = canalMQProducer;
         this.canalMQProducer = canalMQProducer;
@@ -57,20 +62,9 @@ public class CanalMQStarter {
             executorService = Executors.newFixedThreadPool(canalServer.getCanalInstances().size());
             executorService = Executors.newFixedThreadPool(canalServer.getCanalInstances().size());
             logger.info("## start the MQ workers.");
             logger.info("## start the MQ workers.");
             for (final CanalInstance canalInstance : canalServer.getCanalInstances().values()) {
             for (final CanalInstance canalInstance : canalServer.getCanalInstances().values()) {
-                executorService.execute(new Runnable() {
-
-                    @Override
-                    public void run() {
-                        MQProperties.CanalDestination destination = new MQProperties.CanalDestination();
-                        destination.setCanalDestination(canalInstance.getDestination());
-                        CanalMQConfig mqConfig = canalInstance.getMqConfig();
-                        destination.setTopic(mqConfig.getTopic());
-                        destination.setPartition(mqConfig.getPartition());
-                        destination.setPartitionsNum(mqConfig.getPartitionsNum());
-                        destination.setPartitionHash(mqConfig.getPartitionHashProperties());
-                        worker(destination);
-                    }
-                });
+                CanalMQRunnable canalMQRunnable = new CanalMQRunnable(canalInstance);
+                canalMQWorks.put(canalInstance.getDestination(), canalMQRunnable);
+                executorService.execute(canalMQRunnable);
             }
             }
 
 
             running = true;
             running = true;
@@ -78,7 +72,16 @@ public class CanalMQStarter {
             Runtime.getRuntime().addShutdownHook(new Thread() {
             Runtime.getRuntime().addShutdownHook(new Thread() {
 
 
                 public void run() {
                 public void run() {
-                    stop();
+                    try {
+                        logger.info("## stop the MQ workers");
+                        running = false;
+                        executorService.shutdown();
+                        canalMQProducer.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping MQ workers:", e);
+                    } finally {
+                        logger.info("## canal MQ is down.");
+                    }
                 }
                 }
 
 
             });
             });
@@ -89,29 +92,33 @@ public class CanalMQStarter {
         }
         }
     }
     }
 
 
-    public synchronized void stop() {
-        if (!running) {
-            return;
+    public synchronized void startDestination(String destination) {
+        CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
+        if (canalInstance != null) {
+            stopDestination(destination);
+            CanalMQRunnable canalMQRunnable = new CanalMQRunnable(canalInstance);
+            canalMQWorks.put(canalInstance.getDestination(), canalMQRunnable);
+            executorService.execute(canalMQRunnable);
+            logger.info("## Start the MQ work of destination:" + destination);
         }
         }
-        try {
-            logger.info("## stop the MQ workers");
-            running = false;
-            executorService.shutdown();
-            canalMQProducer.stop();
-        } catch (Throwable e) {
-            logger.warn("##something goes wrong when stopping MQ workers:", e);
-        } finally {
-            logger.info("## canal MQ is down.");
+    }
+
+    public synchronized void stopDestination(String destination) {
+        CanalMQRunnable canalMQRunable = canalMQWorks.get(destination);
+        if (canalMQRunable != null) {
+            canalMQRunable.stop();
+            canalMQWorks.remove(destination);
+            logger.info("## Stop the MQ work of destination:" + destination);
         }
         }
     }
     }
 
 
-    private void worker(MQProperties.CanalDestination destination) {
-        while (!running)
+    private void worker(MQProperties.CanalDestination destination, AtomicBoolean destinationRunning) {
+        while (!running || !destinationRunning.get())
             ;
             ;
         logger.info("## start the MQ producer: {}.", destination.getCanalDestination());
         logger.info("## start the MQ producer: {}.", destination.getCanalDestination());
 
 
         final ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
         final ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
-        while (running) {
+        while (running && destinationRunning.get()) {
             try {
             try {
                 if (!canalServer.getCanalInstances().containsKey(clientIdentity.getDestination())) {
                 if (!canalServer.getCanalInstances().containsKey(clientIdentity.getDestination())) {
                     try {
                     try {
@@ -126,7 +133,7 @@ public class CanalMQStarter {
 
 
                 Long getTimeout = properties.getCanalGetTimeout();
                 Long getTimeout = properties.getCanalGetTimeout();
                 int getBatchSize = properties.getCanalBatchSize();
                 int getBatchSize = properties.getCanalBatchSize();
-                while (running) {
+                while (running && destinationRunning.get()) {
                     Message message;
                     Message message;
                     if (getTimeout != null && getTimeout > 0) {
                     if (getTimeout != null && getTimeout > 0) {
                         message = canalServer
                         message = canalServer
@@ -168,4 +175,31 @@ public class CanalMQStarter {
             }
             }
         }
         }
     }
     }
+
+    private class CanalMQRunnable implements Runnable {
+
+        private CanalInstance canalInstance;
+
+        public CanalMQRunnable(CanalInstance canalInstance){
+            this.canalInstance = canalInstance;
+        }
+
+        private AtomicBoolean running = new AtomicBoolean(true);
+
+        @Override
+        public void run() {
+            MQProperties.CanalDestination destination = new MQProperties.CanalDestination();
+            destination.setCanalDestination(canalInstance.getDestination());
+            CanalMQConfig mqConfig = canalInstance.getMqConfig();
+            destination.setTopic(mqConfig.getTopic());
+            destination.setPartition(mqConfig.getPartition());
+            destination.setPartitionsNum(mqConfig.getPartitionsNum());
+            destination.setPartitionHash(mqConfig.getPartitionHashProperties());
+            worker(destination, running);
+        }
+
+        public void stop() {
+            running.set(false);
+        }
+    }
 }
 }