瀏覽代碼

Merge commit '9e816bc48f9955f9e2839873993cef9627c2f389' into branch-rds-support

Conflicts:
	instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java
	instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java
	parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
	parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
charles.lin 7 年之前
父節點
當前提交
68d8afae3e

+ 16 - 7
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java

@@ -490,7 +490,6 @@ public final class RowsLogBuffer {
                     // t % 100);
 
                     StringBuilder builder = new StringBuilder();
-                    builder.append(26);
                     appendNumber4(builder, d / 10000);
                     builder.append('-');
                     appendNumber2(builder, (d % 10000) / 100);
@@ -615,7 +614,13 @@ public final class RowsLogBuffer {
                     if (i32 < 0) {
                         builder.append('-');
                     }
-                    appendNumber2(builder, u32 / 10000);
+
+                    int d = u32 / 10000;
+                    if (d > 100) {
+                        builder.append(String.valueOf(d));
+                    } else {
+                        appendNumber2(builder, d);
+                    }
                     builder.append(':');
                     appendNumber2(builder, (u32 % 10000) / 100);
                     builder.append(':');
@@ -724,7 +729,13 @@ public final class RowsLogBuffer {
                     if (ltime < 0) {
                         builder.append('-');
                     }
-                    appendNumber2(builder, (int) ((intpart >> 12) % (1 << 10)));
+
+                    int d = (int) ((intpart >> 12) % (1 << 10));
+                    if (d > 100) {
+                        builder.append(String.valueOf(d));
+                    } else {
+                        appendNumber2(builder, d);
+                    }
                     builder.append(':');
                     appendNumber2(builder, (int) ((intpart >> 6) % (1 << 6)));
                     builder.append(':');
@@ -1134,7 +1145,7 @@ public final class RowsLogBuffer {
                 .append(digits[(d / 100) % 10])
                 .append(digits[(d / 10) % 10])
                 .append(digits[d % 10]);
-        } else if (d >= 100) {
+        } else {
             builder.append('0');
             appendNumber3(builder, d);
         }
@@ -1142,9 +1153,7 @@ public final class RowsLogBuffer {
 
     private void appendNumber3(StringBuilder builder, int d) {
         if (d >= 100) {
-            builder.append(digits[d / 100])
-                .append(digits[(d / 10) % 10])
-                .append(digits[d % 10]);
+            builder.append(digits[d / 100]).append(digits[(d / 10) % 10]).append(digits[d % 10]);
         } else {
             builder.append('0');
             appendNumber2(builder, d);

+ 15 - 15
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -6,11 +6,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.HistoryTableMetaCache;
-
-import com.alibaba.otter.canal.meta.FileMixedMetaManager;
-
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,7 +20,14 @@ import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
 import com.alibaba.otter.canal.instance.core.AbstractCanalInstance;
 import com.alibaba.otter.canal.instance.manager.model.Canal;
 import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.*;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.DataSourcing;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.HAMode;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.IndexMode;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.MetaMode;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageMode;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageScavengeMode;
+import com.alibaba.otter.canal.meta.FileMixedMetaManager;
 import com.alibaba.otter.canal.meta.MemoryMetaManager;
 import com.alibaba.otter.canal.meta.PeriodMixedMetaManager;
 import com.alibaba.otter.canal.meta.ZooKeeperMetaManager;
@@ -36,7 +38,12 @@ import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
-import com.alibaba.otter.canal.parse.index.*;
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
+import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
+import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
+import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
+import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
+import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;
@@ -114,7 +121,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             ZooKeeperMetaManager zooKeeperMetaManager = new ZooKeeperMetaManager();
             zooKeeperMetaManager.setZkClientx(getZkclientx());
             ((PeriodMixedMetaManager) metaManager).setZooKeeperMetaManager(zooKeeperMetaManager);
-        } else if (mode.isLocalFile()){
+        } else if (mode.isLocalFile()) {
             FileMixedMetaManager fileMixedMetaManager = new FileMixedMetaManager();
             fileMixedMetaManager.setDataDir(parameters.getDataDir());
             fileMixedMetaManager.setPeriod(parameters.getMetaFileFlushPeriod());
@@ -241,13 +248,6 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             mysqlEventParser.setDetectingIntervalInSeconds(parameters.getDetectingIntervalInSeconds());
             // 数据库信息参数
             mysqlEventParser.setSlaveId(parameters.getSlaveId());
-            mysqlEventParser.setTableMetaStorageFactory(parameters.getTableMetaStorageFactory());
-            // Ctrip callback
-//            mysqlEventParser.setCallback(parameters.getCallback());
-//            HistoryTableMetaCache cache = new HistoryTableMetaCache();
-//            cache.init(parameters.getEntries());
-//            mysqlEventParser.setHistoryTableMetaCache(cache);
-
             if (!CollectionUtils.isEmpty(dbAddresses)) {
                 mysqlEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0),
                     parameters.getDbUsername(),

+ 34 - 10
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -5,7 +5,6 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorageFactory;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.ToStringBuilder;
 
@@ -94,6 +93,10 @@ public class CanalParameter implements Serializable {
     private Boolean                  filterTableError                   = Boolean.FALSE;             // 是否忽略表解析异常
     private String                   blackFilter                        = null;                      // 匹配黑名单,忽略解析
 
+    private Boolean                  tsdbEnable                         = Boolean.FALSE;             // 是否开启tableMetaTSDB
+    private String                   tsdbJdbcUrl;
+    private String                   tsdbJdbcUserName;
+    private String                   tsdbJdbcPassword;
     // ================================== 兼容字段处理
     private InetSocketAddress        masterAddress;                                                  // 主库信息
     private String                   masterUsername;                                                 // 帐号
@@ -109,9 +112,6 @@ public class CanalParameter implements Serializable {
     private Long                     standbyLogfileOffest               = null;
     private Long                     standbyTimestamp                   = null;
 
-    // Ctrip Table Meta
-    TableMetaStorageFactory tableMetaStorageFactory;
-
     public static enum RunMode {
 
         /** 嵌入式 */
@@ -250,7 +250,7 @@ public class CanalParameter implements Serializable {
         ZOOKEEPER,
         /** 混合模式,内存+文件 */
         MIXED,
-        /** 本地文件存储模式*/
+        /** 本地文件存储模式 */
         LOCAL_FILE;
 
         public boolean isMemory() {
@@ -265,7 +265,7 @@ public class CanalParameter implements Serializable {
             return this.equals(MetaMode.MIXED);
         }
 
-        public boolean isLocalFile(){
+        public boolean isLocalFile() {
             return this.equals(MetaMode.LOCAL_FILE);
         }
     }
@@ -887,12 +887,36 @@ public class CanalParameter implements Serializable {
         this.blackFilter = blackFilter;
     }
 
-    public TableMetaStorageFactory getTableMetaStorageFactory() {
-        return tableMetaStorageFactory;
+    public Boolean getTsdbEnable() {
+        return tsdbEnable;
+    }
+
+    public void setTsdbEnable(Boolean tsdbEnable) {
+        this.tsdbEnable = tsdbEnable;
+    }
+
+    public String getTsdbJdbcUrl() {
+        return tsdbJdbcUrl;
+    }
+
+    public void setTsdbJdbcUrl(String tsdbJdbcUrl) {
+        this.tsdbJdbcUrl = tsdbJdbcUrl;
+    }
+
+    public String getTsdbJdbcUserName() {
+        return tsdbJdbcUserName;
+    }
+
+    public void setTsdbJdbcUserName(String tsdbJdbcUserName) {
+        this.tsdbJdbcUserName = tsdbJdbcUserName;
+    }
+
+    public String getTsdbJdbcPassword() {
+        return tsdbJdbcPassword;
     }
 
-    public void setTableMetaStorageFactory(TableMetaStorageFactory tableMetaStorageFactory) {
-        this.tableMetaStorageFactory = tableMetaStorageFactory;
+    public void setTsdbJdbcPassword(String tsdbJdbcPassword) {
+        this.tsdbJdbcPassword = tsdbJdbcPassword;
     }
 
     public String toString() {

+ 25 - 18
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -14,27 +14,30 @@ import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
 import com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DefaultTableMetaTSDBFactory;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
-import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBBuilder;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBFactory;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 
 public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
-    protected final Logger      logger                  = LoggerFactory.getLogger(this.getClass());
-    protected static final long BINLOG_START_OFFEST     = 4L;
+    protected final Logger         logger                  = LoggerFactory.getLogger(this.getClass());
+    protected static final long    BINLOG_START_OFFEST     = 4L;
+
+    protected TableMetaTSDBFactory tableMetaTSDBFactory    = new DefaultTableMetaTSDBFactory();
+    protected boolean              enableTsdb              = false;
+    protected String               tsdbSpringXml;
+    protected TableMetaTSDB        tableMetaTSDB;
 
-    protected boolean           enableTsdb              = false;
-    protected String            tsdbSpringXml;
-    protected TableMetaTSDB     tableMetaTSDB;
     // 编码信息
-    protected byte              connectionCharsetNumber = (byte) 33;
-    protected Charset           connectionCharset       = Charset.forName("UTF-8");
-    protected boolean           filterQueryDcl          = false;
-    protected boolean           filterQueryDml          = false;
-    protected boolean           filterQueryDdl          = false;
-    protected boolean           filterRows              = false;
-    protected boolean           filterTableError        = false;
-    protected boolean           useDruidDdlFilter       = true;
+    protected byte                 connectionCharsetNumber = (byte) 33;
+    protected Charset              connectionCharset       = Charset.forName("UTF-8");
+    protected boolean              filterQueryDcl          = false;
+    protected boolean              filterQueryDml          = false;
+    protected boolean              filterQueryDdl          = false;
+    protected boolean              filterRows              = false;
+    protected boolean              filterTableError        = false;
+    protected boolean              useDruidDdlFilter       = true;
 
     protected BinlogParser buildParser() {
         LogEventConvert convert = new LogEventConvert();
@@ -97,7 +100,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
                         // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                         System.setProperty("canal.instance.destination", destination);
                         // 初始化
-                        tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
+                        tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
                     } finally {
                         System.setProperty("canal.instance.destination", "");
                     }
@@ -110,7 +113,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
     public void stop() throws CanalParseException {
         if (enableTsdb) {
-            TableMetaTSDBBuilder.destory(destination);
+            tableMetaTSDBFactory.destory(destination);
             tableMetaTSDB = null;
         }
 
@@ -182,7 +185,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         if (this.enableTsdb) {
             if (tableMetaTSDB == null) {
                 // 初始化
-                tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
+                tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
             }
         }
     }
@@ -192,9 +195,13 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         if (this.enableTsdb) {
             if (tableMetaTSDB == null) {
                 // 初始化
-                tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
+                tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
             }
         }
     }
 
+    public void setTableMetaTSDBFactory(TableMetaTSDBFactory tableMetaTSDBFactory) {
+        this.tableMetaTSDBFactory = tableMetaTSDBFactory;
+    }
+
 }

+ 5 - 23
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -10,11 +10,6 @@ import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.alibaba.otter.canal.parse.inbound.*;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheInterface;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheWithStorage;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorageFactory;
 import org.apache.commons.lang.StringUtils;
 import org.springframework.util.CollectionUtils;
 
@@ -25,6 +20,9 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.ha.CanalHAController;
+import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
+import com.alibaba.otter.canal.parse.inbound.HeartBeatCallback;
+import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.BinlogFormat;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.BinlogImage;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
@@ -64,8 +62,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     // 心跳检查信息
     private String             detectingSQL;                                 // 心跳sql
     private MysqlConnection    metaConnection;                               // 查询meta信息的链接
-    private TableMetaCacheInterface tableMetaCache;                               // 对应meta
-                                                                              // cache
+    private TableMetaCache     tableMetaCache;                               // 对应meta
     private int                fallbackIntervalInSeconds         = 60;       // 切换回退时间
     private BinlogFormat[]     supportBinlogFormats;                         // 支持的binlogFormat,如果设置会执行强校验
     private BinlogImage[]      supportBinlogImages;                          // 支持的binlogImage,如果设置会执行强校验
@@ -74,8 +71,6 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private int                dumpErrorCount                    = 0;        // binlogDump失败异常计数
     private int                dumpErrorCountThreshold           = 2;        // binlogDump失败异常计数阀值
 
-    private TableMetaStorageFactory tableMetaStorageFactory;
-
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
     }
@@ -129,13 +124,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
             }
 
-
-            TableMetaStorage storage = null;
-            if (tableMetaStorageFactory != null) {
-                storage = tableMetaStorageFactory.getTableMetaStorage();
-            }
-
-            tableMetaCache = new TableMetaCacheWithStorage(metaConnection, storage);
+            tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);
             ((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
         }
     }
@@ -921,11 +910,4 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         this.dumpErrorCountThreshold = dumpErrorCountThreshold;
     }
 
-    public TableMetaStorageFactory getTableMetaStorageFactory() {
-        return tableMetaStorageFactory;
-    }
-
-    public void setTableMetaStorageFactory(TableMetaStorageFactory tableMetaStorageFactory) {
-        this.tableMetaStorageFactory = tableMetaStorageFactory;
-    }
 }

+ 8 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -548,12 +548,16 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                     tableError |= parseOneRow(rowDataBuilder, event, buffer, changeColumns, true, tableMeta);
                 }
 
-                rowsCount ++;
+                rowsCount++;
                 rowChangeBuider.addRowDatas(rowDataBuilder.build());
             }
 
             TableMapLogEvent table = event.getTable();
-            Header header = createHeader(event.getHeader(), table.getDbName(), table.getTableName(), eventType, rowsCount);
+            Header header = createHeader(event.getHeader(),
+                table.getDbName(),
+                table.getTableName(),
+                eventType,
+                rowsCount);
 
             RowChange rowChange = rowChangeBuider.build();
             if (tableError) {
@@ -808,12 +812,12 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
     }
 
-
     private Header createHeader(LogHeader logHeader, String schemaName, String tableName, EventType eventType) {
         return createHeader(logHeader, schemaName, tableName, eventType, -1);
     }
 
-    private Header createHeader(LogHeader logHeader, String schemaName, String tableName, EventType eventType, Integer rowsCount) {
+    private Header createHeader(LogHeader logHeader, String schemaName, String tableName, EventType eventType,
+                                Integer rowsCount) {
         // header会做信息冗余,方便以后做检索或者过滤
         Header.Builder headerBuilder = Header.newBuilder();
         headerBuilder.setVersion(version);

+ 19 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DefaultTableMetaTSDBFactory.java

@@ -0,0 +1,19 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
+
+/**
+ * @author agapple 2017年10月11日 下午8:45:40
+ * @since 1.0.25
+ */
+public class DefaultTableMetaTSDBFactory implements TableMetaTSDBFactory {
+
+    /**
+     * 代理一下tableMetaTSDB的获取,使用隔离的spring定义
+     */
+    public TableMetaTSDB build(String destination, String springXml) {
+        return TableMetaTSDBBuilder.build(destination, springXml);
+    }
+
+    public void destory(String destination) {
+        TableMetaTSDBBuilder.destory(destination);
+    }
+}

+ 5 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBBuilder.java

@@ -10,12 +10,15 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
 import com.google.common.collect.Maps;
 
 /**
- * @author agapple 2017年10月11日 下午8:45:40
+ * tableMeta构造器
+ * 
+ * @author agapple 2018年8月8日 上午11:01:08
  * @since 1.0.25
  */
+
 public class TableMetaTSDBBuilder {
 
-    protected final static Logger                                        logger   = LoggerFactory.getLogger(TableMetaTSDBBuilder.class);
+    protected final static Logger                                        logger   = LoggerFactory.getLogger(DefaultTableMetaTSDBFactory.class);
     private static ConcurrentMap<String, ClassPathXmlApplicationContext> contexts = Maps.newConcurrentMap();
 
     /**

+ 18 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBFactory.java

@@ -0,0 +1,18 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
+
+/**
+ * tableMeta构造器,允许重载实现
+ * 
+ * @author agapple 2018年8月8日 上午11:01:08
+ * @since 1.0.26
+ */
+
+public interface TableMetaTSDBFactory {
+
+    /**
+     * 代理一下tableMetaTSDB的获取,使用隔离的spring定义
+     */
+    public TableMetaTSDB build(String destination, String springXml);
+
+    public void destory(String destination);
+}