Переглянути джерело

fixed issue #4869 , support admin tsdb config

jianghang.loujh 1 рік тому
батько
коміт
f9f810226d

+ 3 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -185,6 +185,9 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		<property name="tsdbJdbcUrl" value="${canal.instance.tsdb.url:}"/>
+		<property name="tsdbJdbcUserName" value="${canal.instance.tsdb.dbUsername:}"/>
+		<property name="tsdbJdbcPassword" value="${canal.instance.tsdb.dbPassword:}"/>
 		<property name="tsdbSnapshotInterval" value="${canal.instance.tsdb.snapshot.interval:24}" />
 		<property name="tsdbSnapshotExpire" value="${canal.instance.tsdb.snapshot.expire:360}" />
 

+ 3 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -171,6 +171,9 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		<property name="tsdbJdbcUrl" value="${canal.instance.tsdb.url:}"/>
+		<property name="tsdbJdbcUserName" value="${canal.instance.tsdb.dbUsername:}"/>
+		<property name="tsdbJdbcPassword" value="${canal.instance.tsdb.dbPassword:}"/>
 		<property name="tsdbSnapshotInterval" value="${canal.instance.tsdb.snapshot.interval:24}" />
 		<property name="tsdbSnapshotExpire" value="${canal.instance.tsdb.snapshot.expire:360}" />
 

+ 3 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -159,6 +159,9 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		<property name="tsdbJdbcUrl" value="${canal.instance.tsdb.url:}"/>
+		<property name="tsdbJdbcUserName" value="${canal.instance.tsdb.dbUsername:}"/>
+		<property name="tsdbJdbcPassword" value="${canal.instance.tsdb.dbPassword:}"/>
 		<property name="tsdbSnapshotInterval" value="${canal.instance.tsdb.snapshot.interval:24}" />
 		<property name="tsdbSnapshotExpire" value="${canal.instance.tsdb.snapshot.expire:360}" />
 

+ 12 - 14
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -24,13 +24,7 @@ import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
 import com.alibaba.otter.canal.instance.core.AbstractCanalInstance;
 import com.alibaba.otter.canal.instance.manager.model.Canal;
 import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.DataSourcing;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.HAMode;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.IndexMode;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.MetaMode;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageMode;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageScavengeMode;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.*;
 import com.alibaba.otter.canal.meta.FileMixedMetaManager;
 import com.alibaba.otter.canal.meta.MemoryMetaManager;
 import com.alibaba.otter.canal.meta.PeriodMixedMetaManager;
@@ -46,12 +40,7 @@ import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DefaultTableMetaTSDBFactory;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBBuilder;
-import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
-import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
-import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
-import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
-import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
-import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
+import com.alibaba.otter.canal.parse.index.*;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;
@@ -338,6 +327,8 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
                 mysqlEventParser.setTsdbSnapshotExpire(parameters.getTsdbSnapshotExpire());
             }
             boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable());
+            // manager启动模式默认使用mysql tsdb机制
+            final String tsdbSpringXml = "classpath:spring/tsdb/mysql-tsdb.xml";
             if (tsdbEnable) {
                 mysqlEventParser.setTableMetaTSDBFactory(new DefaultTableMetaTSDBFactory() {
 
@@ -353,9 +344,12 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
                                 System.setProperty("canal.instance.tsdb.url", parameters.getTsdbJdbcUrl());
                                 System.setProperty("canal.instance.tsdb.dbUsername", parameters.getTsdbJdbcUserName());
                                 System.setProperty("canal.instance.tsdb.dbPassword", parameters.getTsdbJdbcPassword());
+                                System.setProperty("canal.instance.destination", destination);
 
-                                return TableMetaTSDBBuilder.build(destination, "classpath:spring/tsdb/mysql-tsdb.xml");
+                                return TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
                             } finally {
+                                // reset
+                                System.setProperty("canal.instance.destination", "");
                                 System.setProperty("canal.instance.tsdb.url", "");
                                 System.setProperty("canal.instance.tsdb.dbUsername", "");
                                 System.setProperty("canal.instance.tsdb.dbPassword", "");
@@ -363,6 +357,10 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
                         }
                     }
                 });
+                mysqlEventParser.setTsdbJdbcUrl(parameters.getTsdbJdbcUrl());
+                mysqlEventParser.setTsdbJdbcUserName(parameters.getTsdbJdbcUserName());
+                mysqlEventParser.setTsdbJdbcPassword(parameters.getTsdbJdbcPassword());
+                mysqlEventParser.setTsdbSpringXml(tsdbSpringXml);
                 mysqlEventParser.setEnableTsdb(tsdbEnable);
             }
             eventParser = mysqlEventParser;

+ 1 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/PlainCanalInstanceGenerator.java

@@ -61,6 +61,7 @@ public class PlainCanalInstanceGenerator implements CanalInstanceGenerator {
                 throw new CanalException(e);
             } finally {
                 System.setProperty("canal.instance.destination", "");
+                com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer.propertiesLocal.remove();
             }
         }
     }

+ 39 - 17
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -16,7 +16,6 @@ import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DefaultTableMetaTSDBFact
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBFactory;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
-import org.apache.commons.lang.StringUtils;
 
 public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
@@ -24,6 +23,9 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
     protected TableMetaTSDBFactory tableMetaTSDBFactory      = new DefaultTableMetaTSDBFactory();
     protected boolean              enableTsdb                = false;
+    protected String               tsdbJdbcUrl;
+    protected String               tsdbJdbcUserName;
+    protected String               tsdbJdbcPassword;
     protected int                  tsdbSnapshotInterval      = 24;
     protected int                  tsdbSnapshotExpire        = 360;
     protected String               tsdbSpringXml;
@@ -149,14 +151,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         if (enableTsdb) {
             if (tableMetaTSDB == null) {
                 synchronized (CanalEventParser.class) {
-                    try {
-                        // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
-                        System.setProperty("canal.instance.destination", destination);
-                        // 初始化
-                        tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
-                    } finally {
-                        System.setProperty("canal.instance.destination", "");
-                    }
+                    buildTableMetaTSDB(tsdbSpringXml);
                 }
             }
         }
@@ -173,6 +168,28 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         super.stop();
     }
 
+    protected void buildTableMetaTSDB(String tsdbSpringXml) {
+        if (tableMetaTSDB != null) {
+            return;
+        }
+
+        try {
+            // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
+            System.setProperty("canal.instance.tsdb.url", tsdbJdbcUrl);
+            System.setProperty("canal.instance.tsdb.dbUsername", tsdbJdbcUserName);
+            System.setProperty("canal.instance.tsdb.dbPassword", tsdbJdbcPassword);
+            System.setProperty("canal.instance.destination", destination);
+            // 初始化
+            this.tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
+        } finally {
+            // reset
+            System.setProperty("canal.instance.destination", "");
+            System.setProperty("canal.instance.tsdb.url", "");
+            System.setProperty("canal.instance.tsdb.dbUsername", "");
+            System.setProperty("canal.instance.tsdb.dbPassword", "");
+        }
+    }
+
     protected MultiStageCoprocessor buildMultiStageCoprocessor() {
         MysqlMultiStageCoprocessor mysqlMultiStageCoprocessor = new MysqlMultiStageCoprocessor(parallelBufferSize,
             parallelThreadSize,
@@ -256,20 +273,14 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     public void setEnableTsdb(boolean enableTsdb) {
         this.enableTsdb = enableTsdb;
         if (this.enableTsdb) {
-            if (tableMetaTSDB == null) {
-                // 初始化
-                tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
-            }
+            buildTableMetaTSDB(tsdbSpringXml);
         }
     }
 
     public void setTsdbSpringXml(String tsdbSpringXml) {
         this.tsdbSpringXml = tsdbSpringXml;
         if (this.enableTsdb) {
-            if (tableMetaTSDB == null) {
-                // 初始化
-                tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
-            }
+            buildTableMetaTSDB(tsdbSpringXml);
         }
     }
 
@@ -301,4 +312,15 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         this.tsdbSnapshotExpire = tsdbSnapshotExpire;
     }
 
+    public void setTsdbJdbcUrl(String tsdbJdbcUrl) {
+        this.tsdbJdbcUrl = tsdbJdbcUrl;
+    }
+
+    public void setTsdbJdbcUserName(String tsdbJdbcUserName) {
+        this.tsdbJdbcUserName = tsdbJdbcUserName;
+    }
+
+    public void setTsdbJdbcPassword(String tsdbJdbcPassword) {
+        this.tsdbJdbcPassword = tsdbJdbcPassword;
+    }
 }