Sfoglia il codice sorgente

动态topic的xml配置

mcy 6 anni fa
parent
commit
94b8fdbbf4

+ 20 - 19
deployer/src/main/resources/spring/default-instance.xml

@@ -11,7 +11,7 @@
 	default-autowire="byName">
 
 	<import resource="classpath:spring/base-instance.xml" />
-	
+
 	<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
 		<property name="destination" value="${canal.instance.destination}" />
 		<property name="eventParser">
@@ -33,10 +33,10 @@
             <ref local="mqConfig" />
         </property>
 	</bean>
-	
+
 	<!-- 报警处理类 -->
 	<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />
-	
+
 	<bean id="zkClientx" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean" >
 		<property name="targetClass" value="com.alibaba.otter.canal.common.zookeeper.ZkClientx" />
 		<property name="targetMethod" value="getZkClient" />
@@ -46,7 +46,7 @@
 			</list>
 		</property>
 	</bean>
-	
+
 	<bean id="metaManager" class="com.alibaba.otter.canal.meta.PeriodMixedMetaManager">
 		<property name="zooKeeperMetaManager">
 			<bean class="com.alibaba.otter.canal.meta.ZooKeeperMetaManager">
@@ -55,7 +55,7 @@
 		</property>
 		<property name="period" value="${canal.zookeeper.flush.period:1000}" />
 	</bean>
-	
+
 	<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
 		<property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
 		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
@@ -63,7 +63,7 @@
 		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
 		<property name="raw" value="${canal.instance.memory.rawEntry:true}" />
 	</bean>
-	
+
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
 		<property name="eventStore" ref="eventStore" />
 		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
@@ -82,35 +82,35 @@
 				<property name="switchEnable" value="${canal.instance.detecting.heartbeatHaEnable:false}" />
 			</bean>
 		</property>
-		
+
 		<property name="alarmHandler" ref="alarmHandler" />
-		
+
 		<!-- 解析过滤处理 -->
 		<property name="eventFilter">
 			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
 				<constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
 			</bean>
 		</property>
-		
+
 		<property name="eventBlackFilter">
 			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
 				<constructor-arg index="0" value="${canal.instance.filter.black.regex:}" />
 				<constructor-arg index="1" value="false" />
 			</bean>
 		</property>
-		
+
 		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
 		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
-		
+
 		<!-- 网络链接参数 -->
 		<property name="receiveBufferSize" value="${canal.instance.network.receiveBufferSize:16384}" />
 		<property name="sendBufferSize" value="${canal.instance.network.sendBufferSize:16384}" />
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
-		
+
 		<!-- 解析编码 -->
 		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
-	
+
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
@@ -124,10 +124,10 @@
 				</constructor-arg>
 			</bean>
 		</property>
-		
+
 		<!-- failover切换时回退的时间 -->
 		<property name="fallbackIntervalInSeconds" value="${canal.instance.fallbackIntervalInSeconds:60}" />
-		
+
 		<!-- 解析数据库信息 -->
 		<property name="masterInfo">
 			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
@@ -149,7 +149,7 @@
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>
-		
+
 		<!-- 解析起始位点 -->
 		<property name="masterPosition">
 			<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
@@ -175,16 +175,16 @@
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
-		
+
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
 		<property name="tsdbSnapshotInterval" value="${canal.instance.tsdb.snapshot.interval:24}" />
 		<property name="tsdbSnapshotExpire" value="${canal.instance.tsdb.snapshot.expire:360}" />
-		
+
 		<!--是否启用GTID模式-->
 		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>
-		
+
 		<!-- parallel parser -->
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
@@ -193,6 +193,7 @@
 
 	<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
 		<property name="topic" value="${canal.mq.topic}" />
+		<property name="dynamicTopic" value="${canal.mq.dynamicTopic}" />
 		<property name="partition" value="${canal.mq.partition}" />
 		<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
 		<property name="partitionHash" value="${canal.mq.partitionHash}" />

+ 15 - 14
deployer/src/main/resources/spring/file-instance.xml

@@ -11,7 +11,7 @@
 	default-autowire="byName">
 
 	<import resource="classpath:spring/base-instance.xml" />
-	
+
 	<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
 		<property name="destination" value="${canal.instance.destination}" />
 		<property name="eventParser">
@@ -33,15 +33,15 @@
             <ref local="mqConfig" />
         </property>
 	</bean>
-	
+
 	<!-- 报警处理类 -->
 	<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />
-	
+
 	<bean id="metaManager" class="com.alibaba.otter.canal.meta.FileMixedMetaManager">
 		<property name="dataDir" value="${canal.file.data.dir:../conf}" />
 		<property name="period" value="${canal.file.flush.period:1000}" />
 	</bean>
-	
+
 	<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
 		<property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
 		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
@@ -49,7 +49,7 @@
 		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
 		<property name="raw" value="${canal.instance.memory.rawEntry:true}" />
 	</bean>
-	
+
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
 		<property name="eventStore" ref="eventStore" />
 		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
@@ -68,16 +68,16 @@
 				<property name="switchEnable" value="${canal.instance.detecting.heartbeatHaEnable:false}" />
 			</bean>
 		</property>
-		
+
 		<property name="alarmHandler" ref="alarmHandler" />
-		
+
 		<!-- 解析过滤处理 -->
 		<property name="eventFilter">
 			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
 				<constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
 			</bean>
 		</property>
-		
+
 		<property name="eventBlackFilter">
 			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
 				<constructor-arg index="0" value="${canal.instance.filter.black.regex:}" />
@@ -86,16 +86,16 @@
 		</property>
 		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
 		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
-		
+
 		<!-- 网络链接参数 -->
 		<property name="receiveBufferSize" value="${canal.instance.network.receiveBufferSize:16384}" />
 		<property name="sendBufferSize" value="${canal.instance.network.sendBufferSize:16384}" />
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
-		
+
 		<!-- 解析编码 -->
 		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
-	
+
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
@@ -109,10 +109,10 @@
 				</constructor-arg>
 			</bean>
 		</property>
-		
+
 		<!-- failover切换时回退的时间 -->
 		<property name="fallbackIntervalInSeconds" value="${canal.instance.fallbackIntervalInSeconds:60}" />
-		
+
 		<!-- 解析数据库信息 -->
 		<property name="masterInfo">
 			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
@@ -169,7 +169,7 @@
 
 		<!--是否启用GTID模式-->
 		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>
-		
+
 		<!-- parallel parser -->
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
@@ -178,6 +178,7 @@
 
 	<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
         <property name="topic" value="${canal.mq.topic}" />
+		<property name="dynamicTopic" value="${canal.mq.dynamicTopic}" />
         <property name="partition" value="${canal.mq.partition}" />
         <property name="partitionsNum" value="${canal.mq.partitionsNum}" />
         <property name="partitionHash" value="${canal.mq.partitionHash}" />

+ 28 - 27
deployer/src/main/resources/spring/group-instance.xml

@@ -9,7 +9,7 @@
            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
 	default-autowire="byName">
-	
+
 	<import resource="classpath:spring/base-instance.xml" />
 
 	<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
@@ -33,12 +33,12 @@
 			<ref local="mqConfig" />
 		</property>
 	</bean>
-	
+
 	<!-- 报警处理类 -->
 	<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />
-	
+
 	<bean id="metaManager" class="com.alibaba.otter.canal.meta.MemoryMetaManager" />
-	
+
 	<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
 		<property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
 		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
@@ -46,12 +46,12 @@
 		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
 		<property name="raw" value="${canal.instance.memory.rawEntry:true}" />
 	</bean>
-	
+
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
 		<property name="eventStore" ref="eventStore" />
 		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
 	</bean>
-	
+
 	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.group.GroupEventParser">
 		<property name="eventParsers">
 			<list>
@@ -74,16 +74,16 @@
 				<property name="switchEnable" value="${canal.instance.detecting.heartbeatHaEnable:false}" />
 			</bean>
 		</property>
-		
+
 		<property name="alarmHandler" ref="alarmHandler" />
-		
+
 		<!-- 解析过滤处理 -->
 		<property name="eventFilter">
 			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
 				<constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
 			</bean>
 		</property>
-		
+
 		<property name="eventBlackFilter">
 			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
 				<constructor-arg index="0" value="${canal.instance.filter.black.regex:}" />
@@ -92,24 +92,24 @@
 		</property>
 		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
 		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
-		
+
 		<!-- 网络链接参数 -->
 		<property name="receiveBufferSize" value="${canal.instance.network.receiveBufferSize:16384}" />
 		<property name="sendBufferSize" value="${canal.instance.network.sendBufferSize:16384}" />
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
-		
+
 		<!-- 解析编码 -->
 		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
-	
+
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
 		</property>
-		
+
 		<!-- failover切换时回退的时间 -->
 		<property name="fallbackIntervalInSeconds" value="${canal.instance.fallbackIntervalInSeconds:60}" />
-		
+
 		<!-- 解析数据库信息 -->
 		<property name="masterInfo">
 			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
@@ -131,7 +131,7 @@
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>
-		
+
 		<!-- 解析起始位点 -->
 		<property name="masterPosition">
 			<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
@@ -156,13 +156,13 @@
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
-		
+
 		<!-- parallel parser -->
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
-	
+
 	<bean id="eventParser2" parent="baseEventParser">
 		<property name="destination" value="${canal.instance.destination}" />
 		<property name="slaveId" value="${canal.instance.mysql.slaveId:0}" />
@@ -176,16 +176,16 @@
 				<property name="switchEnable" value="${canal.instance.detecting.heartbeatHaEnable:false}" />
 			</bean>
 		</property>
-		
+
 		<property name="alarmHandler" ref="alarmHandler" />
-		
+
 		<!-- 解析过滤处理 -->
 		<property name="eventFilter">
 			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
 				<constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
 			</bean>
 		</property>
-		
+
 		<property name="eventBlackFilter">
 			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
 				<constructor-arg index="0" value="${canal.instance.filter.black.regex:}" />
@@ -194,24 +194,24 @@
 		</property>
 		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
 		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
-		
+
 		<!-- 网络链接参数 -->
 		<property name="receiveBufferSize" value="${canal.instance.network.receiveBufferSize:16384}" />
 		<property name="sendBufferSize" value="${canal.instance.network.sendBufferSize:16384}" />
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
-		
+
 		<!-- 解析编码 -->
 		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
-	
+
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
 		</property>
-		
+
 		<!-- failover切换时回退的时间 -->
 		<property name="fallbackIntervalInSeconds" value="${canal.instance.fallbackIntervalInSeconds:60}" />
-		
+
 		<!-- 解析数据库信息 -->
 		<property name="masterInfo">
 			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
@@ -233,7 +233,7 @@
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>
-		
+
 		<!-- 解析起始位点 -->
 		<property name="masterPosition">
 			<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
@@ -259,7 +259,7 @@
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
-		
+
 		<!-- parallel parser -->
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
@@ -268,6 +268,7 @@
 
     <bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
         <property name="topic" value="${canal.mq.topic}" />
+		<property name="dynamicTopic" value="${canal.mq.dynamicTopic}" />
         <property name="partition" value="${canal.mq.partition}" />
         <property name="partitionsNum" value="${canal.mq.partitionsNum}" />
         <property name="partitionHash" value="${canal.mq.partitionHash}" />

+ 17 - 16
deployer/src/main/resources/spring/memory-instance.xml

@@ -9,7 +9,7 @@
            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
 	default-autowire="byName">
-	
+
 	<import resource="classpath:spring/base-instance.xml" />
 
 	<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
@@ -33,12 +33,12 @@
             <ref local="mqConfig" />
         </property>
 	</bean>
-	
+
 	<!-- 报警处理类 -->
 	<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />
-	
+
 	<bean id="metaManager" class="com.alibaba.otter.canal.meta.MemoryMetaManager" />
-	
+
 	<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
 		<property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
 		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
@@ -46,7 +46,7 @@
 		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
 		<property name="raw" value="${canal.instance.memory.rawEntry:true}" />
 	</bean>
-	
+
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
 		<property name="eventStore" ref="eventStore" />
 		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
@@ -65,16 +65,16 @@
 				<property name="switchEnable" value="${canal.instance.detecting.heartbeatHaEnable:false}" />
 			</bean>
 		</property>
-		
+
 		<property name="alarmHandler" ref="alarmHandler" />
-		
+
 		<!-- 解析过滤处理 -->
 		<property name="eventFilter">
 			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
 				<constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
 			</bean>
 		</property>
-		
+
 		<property name="eventBlackFilter">
 			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
 				<constructor-arg index="0" value="${canal.instance.filter.black.regex:}" />
@@ -83,21 +83,21 @@
 		</property>
 		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
 		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
-		
+
 		<!-- 网络链接参数 -->
 		<property name="receiveBufferSize" value="${canal.instance.network.receiveBufferSize:16384}" />
 		<property name="sendBufferSize" value="${canal.instance.network.sendBufferSize:16384}" />
 		<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
-		
+
 		<!-- 解析编码 -->
 		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
-	
+
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
 		</property>
-		
+
 		<!-- failover切换时回退的时间 -->
 		<property name="fallbackIntervalInSeconds" value="${canal.instance.fallbackIntervalInSeconds:60}" />
 
@@ -122,7 +122,7 @@
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>
-		
+
 		<!-- 解析起始位点 -->
 		<property name="masterPosition">
 			<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
@@ -148,16 +148,16 @@
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
-		
+
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
 		<property name="tsdbSnapshotInterval" value="${canal.instance.tsdb.snapshot.interval:24}" />
 		<property name="tsdbSnapshotExpire" value="${canal.instance.tsdb.snapshot.expire:360}" />
-		
+
 		<!--是否启用GTID模式-->
 		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>
-		
+
 		<!-- parallel parser -->
 		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
 		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
@@ -166,6 +166,7 @@
 
 	<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
 		<property name="topic" value="${canal.mq.topic}" />
+		<property name="dynamicTopic" value="${canal.mq.dynamicTopic}" />
 		<property name="partition" value="${canal.mq.partition}" />
 		<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
 		<property name="partitionHash" value="${canal.mq.partitionHash}" />