Просмотр исходного кода

canal server端 MQ 配置修改
MQ配置可动态加载

mcy 6 лет назад
Родитель
Сommit
c5bd42bbf1

+ 9 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -37,6 +37,15 @@ public class CanalConstants {
 
     public static final String CANAL_SOCKETCHANNEL               = ROOT + "." + "socketChannel";
 
+    public static final String CANAL_MQ_SERVERS                  = ROOT + "." + "mq.servers";
+    public static final String CANAL_MQ_RETRIES                  = ROOT + "." + "mq.retries";
+    public static final String CANAL_MQ_BATCHSIZE                = ROOT + "." + "mq.batchSize";
+    public static final String CANAL_MQ_LINGERMS                 = ROOT + "." + "mq.lingerMs";
+    public static final String CANAL_MQ_BUFFERMEMORY             = ROOT + "." + "mq.bufferMemory";
+    public static final String CANAL_MQ_CANALBATCHSIZE           = ROOT + "." + "mq.canalBatchSize";
+    public static final String CANAL_MQ_CANALGETTIMEOUT          = ROOT + "." + "mq.canalGetTimeout";
+    public static final String CANAL_MQ_FLATMESSAGE              = ROOT + "." + "mq.flatMessage";
+
     public static String getInstanceModeKey(String destination) {
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);
     }

+ 106 - 67
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -17,6 +17,7 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
 import com.alibaba.otter.canal.common.CanalException;
+import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.common.utils.AddressUtils;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
@@ -35,6 +36,7 @@ import com.alibaba.otter.canal.instance.manager.CanalConfigClient;
 import com.alibaba.otter.canal.instance.manager.ManagerCanalInstanceGenerator;
 import com.alibaba.otter.canal.instance.spring.SpringCanalInstanceGenerator;
 import com.alibaba.otter.canal.parse.CanalEventParser;
+import com.alibaba.otter.canal.server.CanalMQStarter;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
 import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;
@@ -68,6 +70,8 @@ public class CanalController {
     private CanalInstanceGenerator                   instanceGenerator;
     private ZkClientx                                zkclientx;
 
+    private CanalMQStarter                           canalMQStarter;
+
     public CanalController(){
         this(System.getProperties());
     }
@@ -105,7 +109,7 @@ public class CanalController {
             logger.info("No valid metrics server port found, use default 11112.");
             embededCanalServer.setMetricsPort(11112);
         }
-        
+
         String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
         if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
             canalServer = CanalServerWithNetty.instance();
@@ -127,80 +131,81 @@ public class CanalController {
 
         final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
         ServerRunningMonitors.setServerData(serverData);
-        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
-
-            public ServerRunningMonitor apply(final String destination) {
-                ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
-                runningMonitor.setDestination(destination);
-                runningMonitor.setListener(new ServerRunningListener() {
-
-                    public void processActiveEnter() {
-                        try {
-                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                            embededCanalServer.start(destination);
-                        } finally {
-                            MDC.remove(CanalConstants.MDC_DESTINATION);
+        ServerRunningMonitors
+            .setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
+
+                public ServerRunningMonitor apply(final String destination) {
+                    ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
+                    runningMonitor.setDestination(destination);
+                    runningMonitor.setListener(new ServerRunningListener() {
+
+                        public void processActiveEnter() {
+                            try {
+                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                                embededCanalServer.start(destination);
+                            } finally {
+                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                            }
                         }
-                    }
 
-                    public void processActiveExit() {
-                        try {
-                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                            embededCanalServer.stop(destination);
-                        } finally {
-                            MDC.remove(CanalConstants.MDC_DESTINATION);
+                        public void processActiveExit() {
+                            try {
+                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                                embededCanalServer.stop(destination);
+                            } finally {
+                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                            }
                         }
-                    }
-
-                    public void processStart() {
-                        try {
-                            if (zkclientx != null) {
-                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
-                                                                                                              + port);
-                                initCid(path);
-                                zkclientx.subscribeStateChanges(new IZkStateListener() {
-
-                                    public void handleStateChanged(KeeperState state) throws Exception {
-
-                                    }
 
-                                    public void handleNewSession() throws Exception {
-                                        initCid(path);
-                                    }
-
-                                    @Override
-                                    public void handleSessionEstablishmentError(Throwable error) throws Exception {
-                                        logger.error("failed to connect to zookeeper", error);
-                                    }
-                                });
+                        public void processStart() {
+                            try {
+                                if (zkclientx != null) {
+                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
+                                        ip + ":" + port);
+                                    initCid(path);
+                                    zkclientx.subscribeStateChanges(new IZkStateListener() {
+
+                                        public void handleStateChanged(KeeperState state) throws Exception {
+
+                                        }
+
+                                        public void handleNewSession() throws Exception {
+                                            initCid(path);
+                                        }
+
+                                        @Override
+                                        public void handleSessionEstablishmentError(Throwable error) throws Exception {
+                                            logger.error("failed to connect to zookeeper", error);
+                                        }
+                                    });
+                                }
+                            } finally {
+                                MDC.remove(CanalConstants.MDC_DESTINATION);
                             }
-                        } finally {
-                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
-                    }
 
-                    public void processStop() {
-                        try {
-                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                            if (zkclientx != null) {
-                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
-                                                                                                              + port);
-                                releaseCid(path);
+                        public void processStop() {
+                            try {
+                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                                if (zkclientx != null) {
+                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
+                                        ip + ":" + port);
+                                    releaseCid(path);
+                                }
+                            } finally {
+                                MDC.remove(CanalConstants.MDC_DESTINATION);
                             }
-                        } finally {
-                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
-                    }
 
-                });
-                if (zkclientx != null) {
-                    runningMonitor.setZkClient(zkclientx);
+                    });
+                    if (zkclientx != null) {
+                        runningMonitor.setZkClient(zkclientx);
+                    }
+                    // 触发创建一下cid节点
+                    runningMonitor.init();
+                    return runningMonitor;
                 }
-                // 触发创建一下cid节点
-                runningMonitor.init();
-                return runningMonitor;
-            }
-        }));
+            }));
 
         // 初始化monitor机制
         autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
@@ -220,6 +225,9 @@ public class CanalController {
                         ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                         if (!config.getLazy() && !runningMonitor.isStart()) {
                             runningMonitor.start();
+                            if (canalMQStarter != null) {
+                                canalMQStarter.start(getMQProperties(properties));
+                            }
                         }
                     }
                 }
@@ -228,6 +236,9 @@ public class CanalController {
                     // 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息
                     InstanceConfig config = instanceConfigs.remove(destination);
                     if (config != null) {
+                        if (canalMQStarter != null) {
+                            canalMQStarter.stop();
+                        }
                         embededCanalServer.stop(destination);
                         ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                         if (runningMonitor.isStart()) {
@@ -246,7 +257,8 @@ public class CanalController {
             instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
 
                 public InstanceConfigMonitor apply(InstanceMode mode) {
-                    int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
+                    int scanInterval = Integer
+                        .valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
 
                     if (mode.isSpring()) {
                         SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
@@ -354,8 +366,8 @@ public class CanalController {
             InstanceConfig oldConfig = instanceConfigs.put(destination, config);
 
             if (oldConfig != null) {
-                logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination,
-                        oldConfig, config });
+                logger
+                    .warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
             }
         }
     }
@@ -514,4 +526,31 @@ public class CanalController {
         }
     }
 
+    public CanalMQStarter getCanalMQStarter() {
+        return canalMQStarter;
+    }
+
+    public void setCanalMQStarter(CanalMQStarter canalMQStarter) {
+        this.canalMQStarter = canalMQStarter;
+    }
+
+    public static MQProperties getMQProperties(Properties properties) {
+        MQProperties mqProperties = new MQProperties();
+        mqProperties.setServers(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS));
+        mqProperties
+            .setRetries(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_RETRIES)));
+        mqProperties
+            .setBatchSize(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BATCHSIZE)));
+        mqProperties
+            .setLingerMs(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_LINGERMS)));
+        mqProperties.setBufferMemory(
+            Long.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BUFFERMEMORY)));
+        mqProperties.setCanalBatchSize(
+            Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALBATCHSIZE)));
+        mqProperties.setCanalGetTimeout(
+            Long.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALGETTIMEOUT)));
+        mqProperties.setFlatMessage(
+            Boolean.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_FLATMESSAGE)));
+        return mqProperties;
+    }
 }

+ 3 - 4
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -71,10 +71,9 @@ public class CanalLauncher {
             });
 
             if (canalMQProducer != null) {
-                CanalMQStarter canalServerStarter = new CanalMQStarter(canalMQProducer);
-                if (canalServerStarter != null) {
-                    canalServerStarter.init();
-                }
+                CanalMQStarter canalMQStarter = new CanalMQStarter(canalMQProducer);
+                canalMQStarter.start(CanalController.getMQProperties(properties));
+                controller.setCanalMQStarter(canalMQStarter);
             }
         } catch (Throwable e) {
             logger.error("## Something goes wrong when starting up the canal Server:", e);

+ 14 - 1
deployer/src/main/resources/canal.properties

@@ -91,9 +91,22 @@ canal.auto.scan.interval = 5
 canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
 #canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml
 
-canal.instance.global.mode = spring 
+canal.instance.global.mode = spring
 canal.instance.global.lazy = false
 #canal.instance.global.manager.address = 127.0.0.1:1099
 #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
 canal.instance.global.spring.xml = classpath:spring/file-instance.xml
 #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
+
+##################################################
+######### 		     MQ 		     #############
+##################################################
+canal.mq.servers=slave1:6667
+canal.mq.retries=0
+canal.mq.batchSize=16384
+canal.mq.lingerMs=1
+canal.mq.bufferMemory=33554432
+
+canal.mq.canalBatchSize=50
+canal.mq.canalGetTimeout=100
+canal.mq.flatMessage=true

+ 12 - 3
deployer/src/main/resources/example/instance.properties

@@ -18,7 +18,7 @@ canal.instance.rds.secretkey=
 canal.instance.rds.instanceId=
 
 # table meta tsdb info
-canal.instance.tsdb.enable=true
+canal.instance.tsdb.enable=false
 #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
 #canal.instance.tsdb.dbUsername=canal
 #canal.instance.tsdb.dbPassword=canal
@@ -30,8 +30,8 @@ canal.instance.tsdb.enable=true
 #canal.instance.standby.gtid=
 
 # username/password
-canal.instance.dbUsername=canal
-canal.instance.dbPassword=canal
+canal.instance.dbUsername=root
+canal.instance.dbPassword=121212
 canal.instance.connectionCharset = UTF-8
 canal.instance.defaultDatabaseName =test
 # enable druid Decrypt database password
@@ -42,4 +42,13 @@ canal.instance.enableDruid=false
 canal.instance.filter.regex=.*\\..*
 # table black regex
 canal.instance.filter.black.regex=
+
+# mq config
+canal.mq.topic=example
+# 但分区指定分区0
+canal.mq.partition=0
+# 多分区指定分区数量
+#canal.mq.partitionsNum=3
+# 多分区指定库表主键, 规则: 库名.表名:唯一主键, 多个按逗号分隔
+#canal.mq.partitionHash=mytest.person:id,mytest.role:id
 #################################################

+ 0 - 23
deployer/src/main/resources/mq.yml

@@ -1,23 +0,0 @@
-servers: slave1:6667 #for rocketmq: means the nameserver
-retries: 0
-batchSize: 16384
-lingerMs: 1
-bufferMemory: 33554432
-
-# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
-canalBatchSize: 50
-# Canal get数据的超时时间, 单位: 毫秒, 0为不限超时
-canalGetTimeout: 100
-flatMessage: true
-
-canalDestinations:
-  - canalDestination: example
-    topic: example
-    partition:
-#  #对应topic分区数量
-#  partitionsNum: 3
-#  partitionHash:
-#    #库名.表名: 唯一主键
-#    mytest.person: id
-
-

+ 10 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -29,6 +29,9 @@
 		<property name="alarmHandler">
 			<ref local="alarmHandler" />
 		</property>
+        <property name="mqConfig">
+            <ref local="mqConfig" />
+        </property>
 	</bean>
 	
 	<!-- 报警处理类 -->
@@ -187,4 +190,11 @@
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
+
+	<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
+		<property name="topic" value="${canal.mq.topic}" />
+		<property name="partition" value="${canal.mq.partition}" />
+		<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
+		<property name="partitionHash" value="${canal.mq.partitionHash}" />
+	</bean>
 </beans>

+ 10 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -29,6 +29,9 @@
 		<property name="alarmHandler">
 			<ref local="alarmHandler" />
 		</property>
+        <property name="mqConfig">
+            <ref local="mqConfig" />
+        </property>
 	</bean>
 	
 	<!-- 报警处理类 -->
@@ -172,4 +175,11 @@
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
+
+	<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
+        <property name="topic" value="${canal.mq.topic}" />
+        <property name="partition" value="${canal.mq.partition}" />
+        <property name="partitionsNum" value="${canal.mq.partitionsNum}" />
+        <property name="partitionHash" value="${canal.mq.partitionHash}" />
+	</bean>
 </beans>

+ 10 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -29,6 +29,9 @@
 		<property name="alarmHandler">
 			<ref local="alarmHandler" />
 		</property>
+		<property name="mqConfig">
+			<ref local="mqConfig" />
+		</property>
 	</bean>
 	
 	<!-- 报警处理类 -->
@@ -262,4 +265,11 @@
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
+
+    <bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
+        <property name="topic" value="${canal.mq.topic}" />
+        <property name="partition" value="${canal.mq.partition}" />
+        <property name="partitionsNum" value="${canal.mq.partitionsNum}" />
+        <property name="partitionHash" value="${canal.mq.partitionHash}" />
+    </bean>
 </beans>

+ 10 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -29,6 +29,9 @@
 		<property name="alarmHandler">
 			<ref local="alarmHandler" />
 		</property>
+        <property name="mqConfig">
+            <ref local="mqConfig" />
+        </property>
 	</bean>
 	
 	<!-- 报警处理类 -->
@@ -160,4 +163,11 @@
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
+
+	<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
+		<property name="topic" value="${canal.mq.topic}" />
+		<property name="partition" value="${canal.mq.partition}" />
+		<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
+		<property name="partitionHash" value="${canal.mq.partitionHash}" />
+	</bean>
 </beans>

+ 8 - 0
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java

@@ -38,6 +38,9 @@ public class AbstractCanalInstance extends AbstractCanalLifeCycle implements Can
     protected CanalEventSink<List<CanalEntry.Entry>> eventSink;                                                    // 链接parse和store的桥接器
     protected CanalMetaManager                       metaManager;                                                  // 消费信息管理器
     protected CanalAlarmHandler                      alarmHandler;                                                 // alarm报警机制
+    protected CanalMQConfig                          mqConfig;                                                     // mq的配置
+
+
 
     @Override
     public boolean subscribeChange(ClientIdentity identity) {
@@ -241,4 +244,9 @@ public class AbstractCanalInstance extends AbstractCanalLifeCycle implements Can
     public CanalAlarmHandler getAlarmHandler() {
         return alarmHandler;
     }
+
+    @Override
+    public CanalMQConfig getMqConfig() {
+        return mqConfig;
+    }
 }

+ 2 - 0
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstance.java

@@ -32,4 +32,6 @@ public interface CanalInstance extends CanalLifeCycle {
      * 客户端发生订阅/取消订阅行为
      */
     boolean subscribeChange(ClientIdentity identity);
+
+    CanalMQConfig getMqConfig();
 }

+ 68 - 0
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java

@@ -0,0 +1,68 @@
+package com.alibaba.otter.canal.instance.core;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class CanalMQConfig {
+
+    private String                       topic;
+    private Integer                      partition;
+    private Integer                      partitionsNum;
+    private String                       partitionHash;
+
+    private volatile Map<String, String> partitionHashProperties;
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public Integer getPartition() {
+        return partition;
+    }
+
+    public void setPartition(Integer partition) {
+        this.partition = partition;
+    }
+
+    public Integer getPartitionsNum() {
+        return partitionsNum;
+    }
+
+    public void setPartitionsNum(Integer partitionsNum) {
+        this.partitionsNum = partitionsNum;
+    }
+
+    public String getPartitionHash() {
+        return partitionHash;
+    }
+
+    public void setPartitionHash(String partitionHash) {
+        this.partitionHash = partitionHash;
+    }
+
+    public Map<String, String> getPartitionHashProperties() {
+        if (partitionHashProperties == null) {
+            synchronized (CanalMQConfig.class) {
+                if (partitionHashProperties == null) {
+                    if (partitionHash != null) {
+                        partitionHashProperties = new LinkedHashMap<>();
+                        String[] items = partitionHash.split(",");
+                        for (String item : items) {
+                            int i = item.indexOf(":");
+                            if (i > -1) {
+                                String dbTable = item.substring(0, i).trim();
+                                String pk = item.substring(i + 1).trim();
+                                partitionHashProperties.put(dbTable, pk);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        return partitionHashProperties;
+    }
+}

+ 6 - 0
instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/CanalInstanceWithSpring.java

@@ -2,6 +2,8 @@ package com.alibaba.otter.canal.instance.spring;
 
 import java.util.List;
 
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+import com.alibaba.otter.canal.instance.core.CanalMQConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,4 +58,8 @@ public class CanalInstanceWithSpring extends AbstractCanalInstance {
         this.alarmHandler = alarmHandler;
     }
 
+    public void setMqConfig(CanalMQConfig mqConfig){
+        this.mqConfig = mqConfig;
+    }
+
 }

+ 0 - 5
server/pom.xml

@@ -26,11 +26,6 @@
 			<version>${project.version}</version>
 		</dependency>
 		<!-- kafka -->
-		<dependency>
-			<groupId>org.yaml</groupId>
-			<artifactId>snakeyaml</artifactId>
-			<version>1.17</version>
-		</dependency>
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka_2.11</artifactId>

+ 12 - 21
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -1,7 +1,5 @@
 package com.alibaba.otter.canal.common;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -12,18 +10,19 @@ import java.util.Map;
  */
 public class MQProperties {
 
-    private String                 servers                = "127.0.0.1:6667";
-    private int                    retries                = 0;
-    private int                    batchSize              = 16384;
-    private int                    lingerMs               = 1;
-    private long                   bufferMemory           = 33554432L;
-    private boolean                filterTransactionEntry = true;
-    private String                 producerGroup          = "Canal-Producer";
-    private int                    canalBatchSize         = 50;
-    private Long                   canalGetTimeout        = 100L;
-    private boolean                flatMessage            = true;
+    private String  servers                = "127.0.0.1:6667";
+    private int     retries                = 0;
+    private int     batchSize              = 16384;
+    private int     lingerMs               = 1;
+    private long    bufferMemory           = 33554432L;
+    private boolean filterTransactionEntry = true;
+    private String  producerGroup          = "Canal-Producer";
+    private int     canalBatchSize         = 50;
+    private Long    canalGetTimeout        = 100L;
+    private boolean flatMessage            = true;
 
-    private List<CanalDestination> canalDestinations      = new ArrayList<CanalDestination>();
+    // private List<CanalDestination> canalDestinations = new
+    // ArrayList<CanalDestination>();
 
     public static class CanalDestination {
 
@@ -138,14 +137,6 @@ public class MQProperties {
         this.flatMessage = flatMessage;
     }
 
-    public List<CanalDestination> getCanalDestinations() {
-        return canalDestinations;
-    }
-
-    public void setCanalDestinations(List<CanalDestination> canalDestinations) {
-        this.canalDestinations = canalDestinations;
-    }
-
     public boolean isFilterTransactionEntry() {
         return filterTransactionEntry;
     }

+ 51 - 47
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -1,17 +1,15 @@
 package com.alibaba.otter.canal.server;
 
-import java.io.FileInputStream;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
 
 import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.instance.core.CanalInstance;
+import com.alibaba.otter.canal.instance.core.CanalMQConfig;
 import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
@@ -20,37 +18,28 @@ import com.alibaba.otter.canal.spi.CanalMQProducer;
 
 public class CanalMQStarter {
 
-    private static final Logger logger               = LoggerFactory.getLogger(CanalMQStarter.class);
+    private static final Logger     logger  = LoggerFactory.getLogger(CanalMQStarter.class);
 
-    private static final String CLASSPATH_URL_PREFIX = "classpath:";
+    private volatile boolean        running = false;
 
-    private volatile boolean    running              = false;
+    private ExecutorService         executorService;
 
-    private ExecutorService     executorService;
+    private CanalMQProducer         canalMQProducer;
 
-    private CanalMQProducer     canalMQProducer;
+    private MQProperties            properties;
 
-    private MQProperties        properties;
+    private CanalServerWithEmbedded canalServer;
 
     public CanalMQStarter(CanalMQProducer canalMQProducer){
         this.canalMQProducer = canalMQProducer;
     }
 
-    public void init() {
+    public synchronized void start(MQProperties properties) {
         try {
-            logger.info("## load MQ configurations");
-            String conf = System.getProperty("mq.conf", "classpath:mq.yml");
-
-            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
-                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
-                properties = new Yaml().loadAs(CanalMQStarter.class.getClassLoader().getResourceAsStream(conf),
-                    MQProperties.class);
-            } else {
-                properties = new Yaml().loadAs(new FileInputStream(conf), MQProperties.class);
+            if (running) {
+                return;
             }
-
-            // 初始化 kafka producer
-            // canalMQProducer = new CanalKafkaProducer();
+            this.properties = properties;
             canalMQProducer.init(properties);
             // set filterTransactionEntry
             if (properties.isFilterTransactionEntry()) {
@@ -62,36 +51,34 @@ public class CanalMQStarter {
                 System.setProperty("canal.instance.memory.rawEntry", "false");
             }
 
-            // 对应每个instance启动一个worker线程
-            List<MQProperties.CanalDestination> destinations = properties.getCanalDestinations();
-
-            executorService = Executors.newFixedThreadPool(destinations.size());
+            canalServer = CanalServerWithEmbedded.instance();
 
+            // 对应每个instance启动一个worker线程
+            executorService = Executors.newFixedThreadPool(canalServer.getCanalInstances().size());
             logger.info("## start the MQ workers.");
-            for (final MQProperties.CanalDestination destination : destinations) {
+            for (final CanalInstance canalInstance : canalServer.getCanalInstances().values()) {
                 executorService.execute(new Runnable() {
 
                     @Override
                     public void run() {
+                        MQProperties.CanalDestination destination = new MQProperties.CanalDestination();
+                        destination.setCanalDestination(canalInstance.getDestination());
+                        CanalMQConfig mqConfig = canalInstance.getMqConfig();
+                        destination.setTopic(mqConfig.getTopic());
+                        destination.setPartition(mqConfig.getPartition());
+                        destination.setPartitionsNum(mqConfig.getPartitionsNum());
+                        destination.setPartitionHash(mqConfig.getPartitionHashProperties());
                         worker(destination);
                     }
                 });
             }
+
             running = true;
             logger.info("## the MQ workers is running now ......");
             Runtime.getRuntime().addShutdownHook(new Thread() {
 
                 public void run() {
-                    try {
-                        logger.info("## stop the MQ workers");
-                        running = false;
-                        executorService.shutdown();
-                        canalMQProducer.stop();
-                    } catch (Throwable e) {
-                        logger.warn("##something goes wrong when stopping MQ workers:", e);
-                    } finally {
-                        logger.info("## canal MQ is down.");
-                    }
+                    stop();
                 }
 
             });
@@ -102,15 +89,31 @@ public class CanalMQStarter {
         }
     }
 
+    public synchronized void stop() {
+        if (!running) {
+            return;
+        }
+        try {
+            logger.info("## stop the MQ workers");
+            running = false;
+            executorService.shutdown();
+            canalMQProducer.stop();
+        } catch (Throwable e) {
+            logger.warn("##something goes wrong when stopping MQ workers:", e);
+        } finally {
+            logger.info("## canal MQ is down.");
+        }
+    }
+
     private void worker(MQProperties.CanalDestination destination) {
         while (!running)
             ;
-        logger.info("## start the canal consumer: {}.", destination.getCanalDestination());
-        final CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
+        logger.info("## start the MQ producer: {}.", destination.getCanalDestination());
+
         final ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
         while (running) {
             try {
-                if (!server.getCanalInstances().containsKey(clientIdentity.getDestination())) {
+                if (!canalServer.getCanalInstances().containsKey(clientIdentity.getDestination())) {
                     try {
                         Thread.sleep(3000);
                     } catch (InterruptedException e) {
@@ -118,17 +121,18 @@ public class CanalMQStarter {
                     }
                     continue;
                 }
-                server.subscribe(clientIdentity);
-                logger.info("## the canal consumer {} is running now ......", destination.getCanalDestination());
+                canalServer.subscribe(clientIdentity);
+                logger.info("## the MQ producer: {} is running now ......", destination.getCanalDestination());
 
                 Long getTimeout = properties.getCanalGetTimeout();
                 int getBatchSize = properties.getCanalBatchSize();
                 while (running) {
                     Message message;
                     if (getTimeout != null && getTimeout > 0) {
-                        message = server.getWithoutAck(clientIdentity, getBatchSize, getTimeout, TimeUnit.MILLISECONDS);
+                        message = canalServer
+                            .getWithoutAck(clientIdentity, getBatchSize, getTimeout, TimeUnit.MILLISECONDS);
                     } else {
-                        message = server.getWithoutAck(clientIdentity, getBatchSize);
+                        message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
                     }
 
                     final long batchId = message.getId();
@@ -139,12 +143,12 @@ public class CanalMQStarter {
 
                                 @Override
                                 public void commit() {
-                                    server.ack(clientIdentity, batchId); // 提交确认
+                                    canalServer.ack(clientIdentity, batchId); // 提交确认
                                 }
 
                                 @Override
                                 public void rollback() {
-                                    server.rollback(clientIdentity, batchId);
+                                    canalServer.rollback(clientIdentity, batchId);
                                 }
                             }); // 发送message到topic
                         } else {