Prechádzať zdrojové kódy

Merge pull request #1232 from rewerma/master

fixed #1229
agapple 6 rokov pred
rodič
commit
19e164b599

+ 2 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -50,6 +50,8 @@ public class CanalLauncher {
             if (canalMQProducer != null) {
                 // disable netty
                 System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
+                System.setProperty(CanalConstants.CANAL_DESTINATIONS,
+                    properties.getProperty(CanalConstants.CANAL_DESTINATIONS));
             }
 
             logger.info("## start the canal server.");

+ 26 - 20
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -7,6 +7,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,9 +61,12 @@ public class CanalMQStarter {
             // 对应每个instance启动一个worker线程
             executorService = Executors.newCachedThreadPool();
             logger.info("## start the MQ workers.");
-            for (final CanalInstance canalInstance : canalServer.getCanalInstances().values()) {
-                CanalMQRunnable canalMQRunnable = new CanalMQRunnable(canalInstance);
-                canalMQWorks.put(canalInstance.getDestination(), canalMQRunnable);
+
+            String[] destinations = StringUtils.split(System.getProperty("canal.destinations"), ",");
+            for (String destination : destinations) {
+                destination = destination.trim();
+                CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
+                canalMQWorks.put(destination, canalMQRunnable);
                 executorService.execute(canalMQRunnable);
             }
 
@@ -95,7 +99,7 @@ public class CanalMQStarter {
         CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
         if (canalInstance != null) {
             stopDestination(destination);
-            CanalMQRunnable canalMQRunnable = new CanalMQRunnable(canalInstance);
+            CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
             canalMQWorks.put(canalInstance.getDestination(), canalMQRunnable);
             executorService.execute(canalMQRunnable);
             logger.info("## Start the MQ work of destination:" + destination);
@@ -111,15 +115,16 @@ public class CanalMQStarter {
         }
     }
 
-    private void worker(MQProperties.CanalDestination destination, AtomicBoolean destinationRunning) {
+    private void worker(String destination, AtomicBoolean destinationRunning) {
         while (!running || !destinationRunning.get())
             ;
-        logger.info("## start the MQ producer: {}.", destination.getCanalDestination());
+        logger.info("## start the MQ producer: {}.", destination);
 
-        final ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
+        final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, "");
         while (running && destinationRunning.get()) {
             try {
-                if (!canalServer.getCanalInstances().containsKey(clientIdentity.getDestination())) {
+                CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
+                if (canalInstance == null) {
                     try {
                         Thread.sleep(3000);
                     } catch (InterruptedException e) {
@@ -127,8 +132,16 @@ public class CanalMQStarter {
                     }
                     continue;
                 }
+                MQProperties.CanalDestination canalDestination = new MQProperties.CanalDestination();
+                canalDestination.setCanalDestination(destination);
+                CanalMQConfig mqConfig = canalInstance.getMqConfig();
+                canalDestination.setTopic(mqConfig.getTopic());
+                canalDestination.setPartition(mqConfig.getPartition());
+                canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
+                canalDestination.setPartitionHash(mqConfig.getPartitionHashProperties());
+
                 canalServer.subscribe(clientIdentity);
-                logger.info("## the MQ producer: {} is running now ......", destination.getCanalDestination());
+                logger.info("## the MQ producer: {} is running now ......", destination);
 
                 Long getTimeout = properties.getCanalGetTimeout();
                 int getBatchSize = properties.getCanalBatchSize();
@@ -145,7 +158,7 @@ public class CanalMQStarter {
                     try {
                         int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                         if (batchId != -1 && size != 0) {
-                            canalMQProducer.send(destination, message, new CanalMQProducer.Callback() {
+                            canalMQProducer.send(canalDestination, message, new CanalMQProducer.Callback() {
 
                                 @Override
                                 public void commit() {
@@ -177,23 +190,16 @@ public class CanalMQStarter {
 
     private class CanalMQRunnable implements Runnable {
 
-        private CanalInstance canalInstance;
+        private String destination;
 
-        public CanalMQRunnable(CanalInstance canalInstance){
-            this.canalInstance = canalInstance;
+        CanalMQRunnable(String destination){
+            this.destination = destination;
         }
 
         private AtomicBoolean running = new AtomicBoolean(true);
 
         @Override
         public void run() {
-            MQProperties.CanalDestination destination = new MQProperties.CanalDestination();
-            destination.setCanalDestination(canalInstance.getDestination());
-            CanalMQConfig mqConfig = canalInstance.getMqConfig();
-            destination.setTopic(mqConfig.getTopic());
-            destination.setPartition(mqConfig.getPartition());
-            destination.setPartitionsNum(mqConfig.getPartitionsNum());
-            destination.setPartitionHash(mqConfig.getPartitionHashProperties());
             worker(destination, running);
         }