Browse Source

fixed rds ak/sk config for manager

七锋 6 years ago
parent
commit
9270834631

+ 52 - 4
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -6,7 +6,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import com.alibaba.otter.canal.meta.FileMixedMetaManager;
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +39,16 @@ import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
-import com.alibaba.otter.canal.parse.index.*;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DefaultTableMetaTSDBFactory;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBBuilder;
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
+import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
+import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
+import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
+import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
+import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;
@@ -229,7 +238,18 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
     private CanalEventParser doInitEventParser(SourcingType type, List<InetSocketAddress> dbAddresses) {
         CanalEventParser eventParser;
         if (type.isMysql()) {
-            MysqlEventParser mysqlEventParser = new MysqlEventParser();
+            MysqlEventParser mysqlEventParser = null;
+            if (StringUtils.isNotEmpty(parameters.getRdsAccesskey())
+                && StringUtils.isNotEmpty(parameters.getRdsSecretkey())
+                && StringUtils.isNotEmpty(parameters.getRdsInstanceId())) {
+
+                mysqlEventParser = new RdsBinlogEventParserProxy();
+                ((RdsBinlogEventParserProxy) mysqlEventParser).setAccesskey(parameters.getRdsAccesskey());
+                ((RdsBinlogEventParserProxy) mysqlEventParser).setSecretkey(parameters.getRdsSecretkey());
+                ((RdsBinlogEventParserProxy) mysqlEventParser).setInstanceId(parameters.getRdsInstanceId());
+            } else {
+                mysqlEventParser = new MysqlEventParser();
+            }
             mysqlEventParser.setDestination(destination);
             // 编码参数
             mysqlEventParser.setConnectionCharset(Charset.forName(parameters.getConnectionCharset()));
@@ -273,6 +293,34 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             mysqlEventParser.setFallbackIntervalInSeconds(parameters.getFallbackIntervalInSeconds());
             mysqlEventParser.setProfilingEnabled(false);
             mysqlEventParser.setFilterTableError(parameters.getFilterTableError());
+            mysqlEventParser.setIsGTIDMode(BooleanUtils.toBoolean(parameters.getGtidEnable()));
+            // tsdb
+            boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable());
+            if (tsdbEnable) {
+                mysqlEventParser.setEnableTsdb(tsdbEnable);
+                mysqlEventParser.setTableMetaTSDBFactory(new DefaultTableMetaTSDBFactory() {
+
+                    @Override
+                    public void destory(String destination) {
+                        TableMetaTSDBBuilder.destory(destination);
+                    }
+
+                    @Override
+                    public TableMetaTSDB build(String destination, String springXml) {
+                        try {
+                            System.setProperty("canal.instance.tsdb.url", parameters.getTsdbJdbcUrl());
+                            System.setProperty("canal.instance.tsdb.dbUsername", parameters.getTsdbJdbcUserName());
+                            System.setProperty("canal.instance.tsdb.dbPassword", parameters.getTsdbJdbcPassword());
+
+                            return TableMetaTSDBBuilder.build(destination, "classpath:spring/tsdb/mysql-tsdb.xml");
+                        } finally {
+                            System.setProperty("canal.instance.tsdb.url", "");
+                            System.setProperty("canal.instance.tsdb.dbUsername", "");
+                            System.setProperty("canal.instance.tsdb.dbPassword", "");
+                        }
+                    }
+                });
+            }
             eventParser = mysqlEventParser;
         } else if (type.isLocalBinlog()) {
             LocalBinlogEventParser localBinlogEventParser = new LocalBinlogEventParser();
@@ -291,8 +339,8 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
                     parameters.getDbUsername(),
                     parameters.getDbPassword(),
                     parameters.getDefaultDatabaseName()));
-
             }
+
             eventParser = localBinlogEventParser;
         } else if (type.isOracle()) {
             throw new CanalException("unsupport SourcingType for " + type);

+ 36 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -97,6 +97,10 @@ public class CanalParameter implements Serializable {
     private String                   tsdbJdbcUrl;
     private String                   tsdbJdbcUserName;
     private String                   tsdbJdbcPassword;
+    private String                   rdsAccesskey;
+    private String                   rdsSecretkey;
+    private String                   rdsInstanceId;
+    private Boolean                  gtidEnable                         = Boolean.FALSE;             // 是否开启gtid
     // ================================== 兼容字段处理
     private InetSocketAddress        masterAddress;                                                  // 主库信息
     private String                   masterUsername;                                                 // 帐号
@@ -919,6 +923,38 @@ public class CanalParameter implements Serializable {
         this.tsdbJdbcPassword = tsdbJdbcPassword;
     }
 
+    public String getRdsAccesskey() {
+        return rdsAccesskey;
+    }
+
+    public void setRdsAccesskey(String rdsAccesskey) {
+        this.rdsAccesskey = rdsAccesskey;
+    }
+
+    public String getRdsSecretkey() {
+        return rdsSecretkey;
+    }
+
+    public void setRdsSecretkey(String rdsSecretkey) {
+        this.rdsSecretkey = rdsSecretkey;
+    }
+
+    public String getRdsInstanceId() {
+        return rdsInstanceId;
+    }
+
+    public void setRdsInstanceId(String rdsInstanceId) {
+        this.rdsInstanceId = rdsInstanceId;
+    }
+
+    public Boolean getGtidEnable() {
+        return gtidEnable;
+    }
+
+    public void setGtidEnable(Boolean gtidEnable) {
+        this.gtidEnable = gtidEnable;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }

+ 1 - 0
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -38,6 +38,7 @@ public class Message implements Serializable {
         } else {
             this.entries = entries == null ? new ArrayList<Entry>() : entries;
         }
+        this.raw = raw;
     }
 
     public Message(long id){