Browse Source

fixed code

七锋 7 years ago
parent
commit
8ce320148c
19 changed files with 121 additions and 641 deletions
  1. 6 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java
  2. 3 8
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  3. 1 6
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
  4. 51 47
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java
  5. 19 19
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java
  6. 21 16
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogOpenApi.java
  7. 20 19
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java
  8. 0 191
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/HistoryTableMetaCache.java
  9. 0 20
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheInterface.java
  10. 0 105
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheWithStorage.java
  11. 0 55
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaEntry.java
  12. 0 18
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorage.java
  13. 0 9
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorageFactory.java
  14. 0 9
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/CacheConnectionNull.java
  15. 0 21
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/NoHistoryException.java
  16. 0 14
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaCallback.java
  17. 0 48
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorage.java
  18. 0 26
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorageFactory.java
  19. 0 5
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogEventParserProxyTest.java

+ 6 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -203,14 +203,15 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
             try {
                 LogBuffer buffer = event.getBuffer();
                 if (StringUtils.isNotEmpty(event.getBinlogFileName())
-                    && (context.getLogPosition() == null
-                    || !context.getLogPosition().getFileName().equals(event.getBinlogFileName()))) {
+                    && (context.getLogPosition() == null || !context.getLogPosition()
+                        .getFileName()
+                        .equals(event.getBinlogFileName()))) {
                     // set roate binlog file name
-                    if (context.getLogPosition() == null){
+                    if (context.getLogPosition() == null) {
                         context.setLogPosition(new LogPosition(event.getBinlogFileName(), 0));
-                    }else{
+                    } else {
                         context.setLogPosition(new LogPosition(event.getBinlogFileName(), context.getLogPosition()
-                                .getPosition()));
+                            .getPosition()));
                     }
                 }
 

+ 3 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -10,9 +10,6 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheInterface;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception.NoHistoryException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -90,8 +87,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private volatile AviaterRegexFilter nameFilter;                                                          // 运行时引用可能会有变化,比如规则发生变化时
     private volatile AviaterRegexFilter nameBlackFilter;
 
-
-    private TableMetaCacheInterface tableMetaCache;
+    private TableMetaCache              tableMetaCache;
     private String                      binlogFileName      = "mysql-bin.000001";
 
     private Charset                     charset             = Charset.defaultCharset();
@@ -268,8 +264,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             if (!isSeek) {
                 // 使用新的表结构元数据管理方式
                 EntryPosition position = createPosition(event.getHeader());
-                String fulltbName = schemaName+"."+tableName;
-                tableMetaCache.apply(position, fulltbName, queryString, null);
+                tableMetaCache.apply(position, event.getDbName(), queryString, null);
             }
 
             Header header = createHeader(event.getHeader(), schemaName, tableName, type);
@@ -944,7 +939,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         this.nameBlackFilter = nameBlackFilter;
     }
 
-    public void setTableMetaCache(TableMetaCacheInterface tableMetaCache) {
+    public void setTableMetaCache(TableMetaCache tableMetaCache) {
         this.tableMetaCache = tableMetaCache;
     }
 

+ 1 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -6,7 +6,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheInterface;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
@@ -30,7 +29,7 @@ import com.google.common.cache.LoadingCache;
  * @author jianghang 2013-1-17 下午10:15:16
  * @version 1.0.0
  */
-public class TableMetaCache implements TableMetaCacheInterface {
+public class TableMetaCache {
 
     public static final String              COLUMN_NAME    = "COLUMN_NAME";
     public static final String              COLUMN_TYPE    = "COLUMN_TYPE";
@@ -100,10 +99,6 @@ public class TableMetaCache implements TableMetaCacheInterface {
             String createDDL = packet.getFieldValues().get(1);
             MemoryTableMeta memoryTableMeta = new MemoryTableMeta();
             memoryTableMeta.apply(DatabaseTableMeta.INIT_POSITION, schema, createDDL, null);
-            String[] strings = table.split("\\.");
-            if (strings.length > 1) {
-                table = strings[1];
-            }
             TableMeta tableMeta = memoryTableMeta.find(schema, table);
             return tableMeta.getFields();
         } else {

+ 51 - 47
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java

@@ -1,7 +1,17 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.rds;
 
-import java.io.*;
-import java.util.*;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -20,29 +30,27 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
 
-import io.netty.handler.codec.http.HttpResponseStatus;
-
 /**
  * @author chengjin.lyf on 2018/8/7 下午3:10
  * @since 1.0.25
  */
 public class BinlogDownloadQueue {
 
-    private static final Logger logger = LoggerFactory.getLogger(BinlogDownloadQueue.class);
-    private static final int      TIMEOUT             = 10000;
+    private static final Logger             logger        = LoggerFactory.getLogger(BinlogDownloadQueue.class);
+    private static final int                TIMEOUT       = 10000;
 
     private LinkedBlockingQueue<BinlogFile> downloadQueue = new LinkedBlockingQueue<BinlogFile>();
-    private LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
-    private LinkedList<BinlogFile> binlogList;
-    private final int batchSize;
-    private Thread downloadThread;
-    public boolean running = true;
-    private final String destDir;
-    private String hostId;
-    private int currentSize;
-    private String lastDownload;
+    private LinkedBlockingQueue<Runnable>   taskQueue     = new LinkedBlockingQueue<Runnable>();
+    private LinkedList<BinlogFile>          binlogList;
+    private final int                       batchSize;
+    private Thread                          downloadThread;
+    public boolean                          running       = true;
+    private final String                    destDir;
+    private String                          hostId;
+    private int                             currentSize;
+    private String                          lastDownload;
 
-    public BinlogDownloadQueue(List<BinlogFile> downloadQueue, int batchSize, String destDir) throws IOException {
+    public BinlogDownloadQueue(List<BinlogFile> downloadQueue, int batchSize, String destDir) throws IOException{
         this.binlogList = new LinkedList(downloadQueue);
         this.batchSize = batchSize;
         this.destDir = destDir;
@@ -51,12 +59,13 @@ public class BinlogDownloadQueue {
         cleanDir();
     }
 
-    private void prepareBinlogList(){
+    private void prepareBinlogList() {
         for (BinlogFile binlog : this.binlogList) {
             String fileName = StringUtils.substringBetween(binlog.getDownloadLink(), "mysql-bin.", "?");
             binlog.setFileName(fileName);
         }
         Collections.sort(this.binlogList, new Comparator<BinlogFile>() {
+
             @Override
             public int compare(BinlogFile o1, BinlogFile o2) {
                 return o1.getFileName().compareTo(o2.getFileName());
@@ -78,35 +87,34 @@ public class BinlogDownloadQueue {
         downloadThread.start();
     }
 
-
     public BinlogFile tryOne() throws IOException {
         BinlogFile binlogFile = binlogList.poll();
         download(binlogFile);
         hostId = binlogFile.getHostInstanceID();
-        this.currentSize ++;
+        this.currentSize++;
         return binlogFile;
     }
 
-    public void notifyNotMatch(){
-        this.currentSize --;
+    public void notifyNotMatch() {
+        this.currentSize--;
         filter(hostId);
     }
 
-    private void filter(String hostInstanceId){
+    private void filter(String hostInstanceId) {
         Iterator<BinlogFile> it = binlogList.iterator();
-        while (it.hasNext()){
+        while (it.hasNext()) {
             BinlogFile bf = it.next();
-            if(bf.getHostInstanceID().equalsIgnoreCase(hostInstanceId)){
+            if (bf.getHostInstanceID().equalsIgnoreCase(hostInstanceId)) {
                 it.remove();
-            }else{
+            } else {
                 hostId = bf.getHostInstanceID();
             }
         }
     }
 
-    public boolean isLastFile(String fileName){
+    public boolean isLastFile(String fileName) {
         String needCompareName = lastDownload;
-        if (StringUtils.isNotEmpty(needCompareName) && StringUtils.endsWith(needCompareName, "tar")){
+        if (StringUtils.isNotEmpty(needCompareName) && StringUtils.endsWith(needCompareName, "tar")) {
             needCompareName = needCompareName.substring(0, needCompareName.indexOf("."));
         }
         return fileName.equalsIgnoreCase(needCompareName) && binlogList.isEmpty();
@@ -115,27 +123,27 @@ public class BinlogDownloadQueue {
     public void prepare() throws InterruptedException {
         for (int i = this.currentSize; i < batchSize && !binlogList.isEmpty(); i++) {
             BinlogFile binlogFile = null;
-            while (!binlogList.isEmpty()){
+            while (!binlogList.isEmpty()) {
                 binlogFile = binlogList.poll();
-                if (!binlogFile.getHostInstanceID().equalsIgnoreCase(hostId)){
+                if (!binlogFile.getHostInstanceID().equalsIgnoreCase(hostId)) {
                     continue;
                 }
                 break;
             }
-            if (binlogFile == null){
+            if (binlogFile == null) {
                 break;
             }
             this.downloadQueue.put(binlogFile);
             this.lastDownload = "mysql-bin." + binlogFile.getFileName();
-            this.currentSize ++;
+            this.currentSize++;
         }
     }
 
-    public void downOne(){
-        this.currentSize --;
+    public void downOne() {
+        this.currentSize--;
     }
 
-    public void release(){
+    public void release() {
         running = false;
         this.currentSize = 0;
         binlogList.clear();
@@ -146,21 +154,17 @@ public class BinlogDownloadQueue {
         String downloadLink = binlogFile.getDownloadLink();
         String fileName = binlogFile.getFileName();
         HttpGet httpGet = new HttpGet(downloadLink);
-        CloseableHttpClient httpClient = HttpClientBuilder.create()
-                .setMaxConnPerRoute(50)
-                .setMaxConnTotal(100)
-                .build();
+        CloseableHttpClient httpClient = HttpClientBuilder.create().setMaxConnPerRoute(50).setMaxConnTotal(100).build();
         RequestConfig requestConfig = RequestConfig.custom()
-                .setConnectTimeout(TIMEOUT)
-                .setConnectionRequestTimeout(TIMEOUT)
-                .setSocketTimeout(TIMEOUT)
-                .build();
+            .setConnectTimeout(TIMEOUT)
+            .setConnectionRequestTimeout(TIMEOUT)
+            .setSocketTimeout(TIMEOUT)
+            .build();
         httpGet.setConfig(requestConfig);
         HttpResponse response = httpClient.execute(httpGet);
         int statusCode = response.getStatusLine().getStatusCode();
         if (statusCode != HttpResponseStatus.OK.code()) {
-            throw new RuntimeException("download failed , url:" + downloadLink + " , statusCode:"
-                                       + statusCode);
+            throw new RuntimeException("download failed , url:" + downloadLink + " , statusCode:" + statusCode);
         }
         saveFile(new File(destDir), "mysql-bin." + fileName, response);
     }
@@ -168,7 +172,7 @@ public class BinlogDownloadQueue {
     private static void saveFile(File parentFile, String fileName, HttpResponse response) throws IOException {
         InputStream is = response.getEntity().getContent();
         long totalSize = Long.parseLong(response.getFirstHeader("Content-Length").getValue());
-        if(response.getFirstHeader("Content-Disposition")!=null){
+        if (response.getFirstHeader("Content-Disposition") != null) {
             fileName = response.getFirstHeader("Content-Disposition").getValue();
             fileName = StringUtils.substringAfter(fileName, "filename=");
         }
@@ -243,11 +247,11 @@ public class BinlogDownloadQueue {
             while (running) {
                 try {
                     BinlogFile binlogFile = downloadQueue.poll(5000, TimeUnit.MILLISECONDS);
-                    if (binlogFile != null){
+                    if (binlogFile != null) {
                         download(binlogFile);
                     }
                     Runnable runnable = taskQueue.poll(5000, TimeUnit.MILLISECONDS);
-                    if (runnable != null){
+                    if (runnable != null) {
                         runnable.run();
                     }
                 } catch (Exception e) {

+ 19 - 19
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java

@@ -14,25 +14,26 @@ import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
  */
 public class RdsBinlogEventParserProxy extends MysqlEventParser {
 
-    private String rdsOpenApiUrl = "https://rds.aliyuncs.com/"; // openapi地址
-    private String accesskey; // 云账号的ak
-    private String secretkey; // 云账号sk
-    private String instanceId; // rds实例id
-    private Long startTime;
-    private Long endTime;
-    private String directory; //binlog 目录
-    private int batchSize = 4; //最多下载的binlog文件数量
+    private String                    rdsOpenApiUrl        = "https://rds.aliyuncs.com/";    // openapi地址
+    private String                    accesskey;                                             // 云账号的ak
+    private String                    secretkey;                                             // 云账号sk
+    private String                    instanceId;                                            // rds实例id
+    private Long                      startTime;
+    private Long                      endTime;
+    private String                    directory;                                             // binlog
+                                                                                              // 目录
+    private int                       batchSize            = 4;                              // 最多下载的binlog文件数量
 
     private RdsLocalBinlogEventParser rdsBinlogEventParser = new RdsLocalBinlogEventParser();
-    private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
+    private ExecutorService           executorService      = Executors.newSingleThreadExecutor(new ThreadFactory() {
 
-        @Override
-        public Thread newThread(Runnable r) {
-            Thread t = new Thread(r, "rds-binlog-daemon-thread");
-            t.setDaemon(true);
-            return t;
-        }
-    });
+                                                               @Override
+                                                               public Thread newThread(Runnable r) {
+                                                                   Thread t = new Thread(r, "rds-binlog-daemon-thread");
+                                                                   t.setDaemon(true);
+                                                                   return t;
+                                                               }
+                                                           });
 
     @Override
     public void start() {
@@ -62,9 +63,11 @@ public class RdsBinlogEventParserProxy extends MysqlEventParser {
         rdsBinlogEventParser.setDirectory(directory);
         rdsBinlogEventParser.setBatchSize(batchSize);
         rdsBinlogEventParser.setFinishListener(new RdsLocalBinlogEventParser.ParseFinishListener() {
+
             @Override
             public void onFinish() {
                 executorService.execute(new Runnable() {
+
                     @Override
                     public void run() {
                         rdsBinlogEventParser.stop();
@@ -124,17 +127,14 @@ public class RdsBinlogEventParserProxy extends MysqlEventParser {
         this.rdsOpenApiUrl = rdsOpenApiUrl;
     }
 
-
     public void setAccesskey(String accesskey) {
         this.accesskey = accesskey;
     }
 
-
     public void setSecretkey(String secretkey) {
         this.secretkey = secretkey;
     }
 
-
     public void setInstanceId(String instanceId) {
         this.instanceId = instanceId;
     }

+ 21 - 16
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogOpenApi.java

@@ -1,9 +1,5 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.rds;
 
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.DescribeBinlogFileResult;
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.RdsItem;
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.request.DescribeBinlogFilesRequest;
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 import java.io.BufferedOutputStream;
@@ -17,7 +13,14 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.UUID;
 
 import javax.crypto.Mac;
 import javax.crypto.SecretKey;
@@ -40,6 +43,10 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.DescribeBinlogFileResult;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.RdsItem;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.request.DescribeBinlogFilesRequest;
 
 /**
  * @author agapple 2017年10月14日 下午1:53:52
@@ -55,11 +62,10 @@ public class RdsBinlogOpenApi {
     private static final String   API_VERSION         = "2014-08-15";
     private static final String   SIGNATURE_VERSION   = "1.0";
 
-
-    public static List<BinlogFile> listBinlogFiles(String url, String ak, String sk, String dbInstanceId, Date startTime,
-                                                   Date endTime) {
+    public static List<BinlogFile> listBinlogFiles(String url, String ak, String sk, String dbInstanceId,
+                                                   Date startTime, Date endTime) {
         DescribeBinlogFilesRequest request = new DescribeBinlogFilesRequest();
-        if (StringUtils.isNotEmpty(url)){
+        if (StringUtils.isNotEmpty(url)) {
             try {
                 URI uri = new URI(url);
                 request.setEndPoint(uri.getHost());
@@ -76,12 +82,12 @@ public class RdsBinlogOpenApi {
         request.setAccessKeySecret(sk);
         DescribeBinlogFileResult result = null;
         int retryTime = 3;
-        while (true){
-            try{
+        while (true) {
+            try {
                 result = request.doAction();
                 break;
-            }catch (Exception e){
-                if(retryTime-- <= 0){
+            } catch (Exception e) {
+                if (retryTime-- <= 0) {
                     throw new RuntimeException(e);
                 }
                 try {
@@ -90,17 +96,16 @@ public class RdsBinlogOpenApi {
                 }
             }
         }
-        if (result == null){
+        if (result == null) {
             return Collections.EMPTY_LIST;
         }
         RdsItem rdsItem = result.getItems();
-        if (rdsItem != null){
+        if (rdsItem != null) {
             return rdsItem.getBinLogFile();
         }
         return Collections.EMPTY_LIST;
     }
 
-
     public static void downloadBinlogFiles(String url, String ak, String sk, String dbInstanceId, Date startTime,
                                            Date endTime, File destDir) throws Throwable {
         int pageSize = 100;

+ 20 - 19
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java

@@ -28,15 +28,15 @@ import com.alibaba.otter.canal.protocol.position.LogPosition;
  */
 public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements CanalEventParser, LocalBinLogConnection.FileParserListener {
 
-    private String url = "https://rds.aliyuncs.com/"; // openapi地址
-    private String accesskey; // 云账号的ak
-    private String secretkey; // 云账号sk
-    private String instanceId; // rds实例id
-    private Long startTime;
-    private Long endTime;
+    private String              url = "https://rds.aliyuncs.com/"; // openapi地址
+    private String              accesskey;                        // 云账号的ak
+    private String              secretkey;                        // 云账号sk
+    private String              instanceId;                       // rds实例id
+    private Long                startTime;
+    private Long                endTime;
     private BinlogDownloadQueue binlogDownloadQueue;
     private ParseFinishListener finishListener;
-    private int batchSize;
+    private int                 batchSize;
 
     public RdsLocalBinlogEventParser(){
     }
@@ -59,7 +59,8 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
             }
             long startTimeInMill = entryPosition.getTimestamp();
             startTime = startTimeInMill;
-            List<BinlogFile> binlogFiles = RdsBinlogOpenApi.listBinlogFiles(url, accesskey,
+            List<BinlogFile> binlogFiles = RdsBinlogOpenApi.listBinlogFiles(url,
+                accesskey,
                 secretkey,
                 instanceId,
                 new Date(startTime),
@@ -132,27 +133,22 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
         }
     }
 
-
     public void setAccesskey(String accesskey) {
         this.accesskey = accesskey;
     }
 
-
     public void setSecretkey(String secretkey) {
         this.secretkey = secretkey;
     }
 
-
     public void setInstanceId(String instanceId) {
         this.instanceId = instanceId;
     }
 
-
     public void setStartTime(Long startTime) {
         this.startTime = startTime;
     }
 
-
     public void setEndTime(Long endTime) {
         this.endTime = endTime;
     }
@@ -162,20 +158,24 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
         try {
             binlogDownloadQueue.downOne();
             File needDeleteFile = new File(directory + File.separator + fileName);
-            if (needDeleteFile.exists()){
+            if (needDeleteFile.exists()) {
                 needDeleteFile.delete();
             }
             // 处理下logManager位点问题
             LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
             EntryPosition position = logPosition.getPostion();
-            if (position != null){
+            if (position != null) {
                 LogPosition newLogPosition = new LogPosition();
                 String journalName = position.getJournalName();
                 int sepIdx = journalName.indexOf(".");
-                String fileIndex = journalName.substring(sepIdx+1);
+                String fileIndex = journalName.substring(sepIdx + 1);
                 int index = NumberUtils.toInt(fileIndex) + 1;
-                String newJournalName = journalName.substring(0, sepIdx) + "." + StringUtils.leftPad(String.valueOf(index), fileIndex.length(), "0");
-                newLogPosition.setPostion(new EntryPosition(newJournalName, 4L, position.getTimestamp(), position.getServerId()));
+                String newJournalName = journalName.substring(0, sepIdx) + "."
+                                        + StringUtils.leftPad(String.valueOf(index), fileIndex.length(), "0");
+                newLogPosition.setPostion(new EntryPosition(newJournalName,
+                    4L,
+                    position.getTimestamp(),
+                    position.getServerId()));
                 newLogPosition.setIdentity(logPosition.getIdentity());
                 logPositionManager.persistLogPosition(destination, newLogPosition);
             }
@@ -202,7 +202,8 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
         this.finishListener = finishListener;
     }
 
-    public interface ParseFinishListener{
+    public interface ParseFinishListener {
+
         void onFinish();
     }
 

+ 0 - 191
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/HistoryTableMetaCache.java

@@ -1,191 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
-
-import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
-import com.alibaba.otter.canal.parse.inbound.TableMeta;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception.CacheConnectionNull;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception.NoHistoryException;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-import java.io.IOException;
-import java.util.*;
-
-public class HistoryTableMetaCache {
-    private TableMetaStorage tableMetaStorage;
-    private MysqlConnection metaConnection;
-    private LoadingCache<String, Map<Long, TableMeta>> cache; // 第一层:数据库名.表名,第二层时间戳,TableMeta
-
-    public HistoryTableMetaCache() {
-        cache = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Long, TableMeta>>() {
-            @Override
-            public Map<Long, TableMeta> load(String tableName) throws Exception {
-                Long timestamp = new Date().getTime();
-                String[] strs = tableName.split("\\.");
-                String schema = strs[0];
-                if (tableMetaStorage != null) {
-                    init(tableMetaStorage.fetchByTableName(tableName)); // 从存储中读取表的历史ddl
-                }
-                ResultSetPacket resultSetPacket = connectionQuery("show create table " + tableName); // 获取当前ddl
-                String currentDdl = resultSetPacket.getFieldValues().get(1);
-                if (cache.asMap().containsKey(tableName)) {
-                    Map<Long, TableMeta> tableMetaMap = cache.getUnchecked(tableName);
-                    if (tableMetaMap.isEmpty()) {
-                        put(schema, tableName, currentDdl, timestamp - 1000L); // 放入当前schema,取时间为当前时间-1s
-                    } else {                                               // 如果table存在历史
-                        Iterator<Long> iterator = tableMetaMap.keySet().iterator();
-                        Long firstTimestamp = iterator.next();
-                        TableMeta first = tableMetaMap.get(firstTimestamp); // 拿第一条ddl
-                        if (!first.getDdl().equalsIgnoreCase(currentDdl)) { // 当前ddl与历史第一条不一致,放入当前ddl
-                            put(schema, tableName, currentDdl, calculateNewTimestamp(firstTimestamp)); // 计算放入的timestamp,设为第一条时间+1s
-                        }
-                    }
-                } else {
-                    put(schema, tableName, currentDdl, timestamp - 1000L); // 放入当前schema
-                }
-                return cache.get(tableName);
-            }
-        });
-    }
-
-    public void init(List<TableMetaEntry> entries) throws IOException {
-        if (entries == null) {
-            return;
-        }
-        for (TableMetaEntry entry : entries) {
-            try {
-                put(entry.getSchema(), entry.getTable(), entry.getDdl(), entry.getTimestamp());
-            } catch (CacheConnectionNull cacheConnectionNull) {
-                cacheConnectionNull.printStackTrace();
-            }
-        }
-    }
-
-    public TableMeta put(String schema, String table, String ddl, Long timestamp) throws CacheConnectionNull, IOException {
-        ResultSetPacket resultSetPacket;
-        if (!(ddl.contains("CREATE TABLE") || ddl.contains("create table"))) { // 尝试直接从数据库拉取CREATE TABLE的DDL
-            resultSetPacket = connectionQuery("show create table " + table);
-            ddl = resultSetPacket.getFieldValues().get(1);
-        } else { // CREATE TABLE 的 DDL
-            resultSetPacket = new ResultSetPacket();
-            List<String> fields = new ArrayList<String>();
-            String[] strings = table.split("\\.");
-            String shortTable = table;
-            if (strings.length > 1) {
-                shortTable = strings[1];
-            }
-            fields.add(0, shortTable);
-            fields.add(1, ddl);
-            resultSetPacket.setFieldValues(fields);
-            if (metaConnection != null) {
-                resultSetPacket.setSourceAddress(metaConnection.getAddress());
-            }
-        }
-        Map<Long, TableMeta> tableMetaMap;
-        if (!cache.asMap().containsKey(table)) {
-            tableMetaMap = new TreeMap<Long, TableMeta>(new Comparator<Long>() {
-                @Override
-                public int compare(Long o1, Long o2) {
-                    return o2.compareTo(o1);
-                }
-            });
-            cache.put(table, tableMetaMap);
-        } else {
-            tableMetaMap = cache.getUnchecked(table);
-        }
-        eliminate(tableMetaMap); // 淘汰旧的TableMeta
-        TableMeta tableMeta = new TableMeta(schema, table, TableMetaCache.parseTableMeta(schema, table, resultSetPacket));
-        if (tableMeta.getDdl() == null) { // 生成的TableMeta有时DDL为null
-            tableMeta.setDdl(ddl);
-        }
-        tableMetaMap.put(timestamp, tableMeta);
-        return tableMeta;
-    }
-
-    public TableMeta get(String schema, String table, Long timestamp) throws NoHistoryException, CacheConnectionNull {
-        Map<Long, TableMeta> tableMetaMap = cache.getUnchecked(table);
-        Iterator<Long> iterator = tableMetaMap.keySet().iterator();
-        Long selected = null;
-        while(iterator.hasNext()) {
-            Long temp = iterator.next();
-            if (timestamp > temp) {
-                selected = temp;
-                break;
-            }
-        }
-
-        if (selected == null) {
-            iterator = tableMetaMap.keySet().iterator();
-            if (iterator.hasNext()) {
-                selected = iterator.next();
-            } else {
-                throw new NoHistoryException(schema, table);
-            }
-        }
-
-        return tableMetaMap.get(selected);
-    }
-
-    public void clearTableMeta() {
-        cache.invalidateAll();
-    }
-
-    public void clearTableMetaWithSchemaName(String schema) {
-        for (String tableName : cache.asMap().keySet()) {
-            String[] strs = tableName.split("\\.");
-            if (schema.equalsIgnoreCase(strs[0])) {
-                cache.invalidate(tableName);
-            }
-        }
-    }
-
-    public void clearTableMeta(String schema, String table) {
-        if (!table.contains(".")) {
-            table = schema+"."+table;
-        }
-        cache.invalidate(table);
-    }
-
-    // eliminate older table meta in cache
-    private void eliminate(Map<Long, TableMeta> tableMetaMap) {
-        int MAX_CAPABILITY = 20;
-        if (tableMetaMap.keySet().size() < MAX_CAPABILITY) {
-            return;
-        }
-        Iterator<Long> iterator = tableMetaMap.keySet().iterator();
-        while(iterator.hasNext()) {
-            iterator.next();
-        }
-        iterator.remove();
-    }
-
-    private Long calculateNewTimestamp(Long oldTimestamp) {
-        return oldTimestamp + 1000;
-    }
-
-    private ResultSetPacket connectionQuery(String query) throws CacheConnectionNull, IOException {
-        if (metaConnection == null) {
-            throw new CacheConnectionNull();
-        }
-        try {
-            return metaConnection.query(query);
-        } catch (IOException e) {
-            try {
-                metaConnection.reconnect();
-                return metaConnection.query(query);
-            } catch (IOException e1) {
-                throw e1;
-            }
-        }
-    }
-
-    public void setMetaConnection(MysqlConnection metaConnection) {
-        this.metaConnection = metaConnection;
-    }
-
-    public void setTableMetaStorage(TableMetaStorage tableMetaStorage) {
-        this.tableMetaStorage = tableMetaStorage;
-    }
-}

+ 0 - 20
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheInterface.java

@@ -1,20 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
-
-import com.alibaba.otter.canal.parse.inbound.TableMeta;
-import com.alibaba.otter.canal.protocol.position.EntryPosition;
-
-public interface TableMetaCacheInterface {
-
-    TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position);
-
-    void clearTableMeta();
-
-    void clearTableMetaWithSchemaName(String schema);
-
-    void clearTableMeta(String schema, String table);
-
-    boolean apply(EntryPosition position, String schema, String ddl, String extra);
-
-    boolean isOnRDS();
-
-}

+ 0 - 105
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheWithStorage.java

@@ -1,105 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
-
-import com.alibaba.otter.canal.parse.inbound.TableMeta;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
-import com.alibaba.otter.canal.protocol.position.EntryPosition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-public class TableMetaCacheWithStorage implements TableMetaCacheInterface {
-
-    private static Logger logger = LoggerFactory.getLogger(TableMetaCacheWithStorage.class);
-    private TableMetaStorage tableMetaStorage; // TableMeta存储
-    private HistoryTableMetaCache cache = new HistoryTableMetaCache(); // cache
-
-    public TableMetaCacheWithStorage(MysqlConnection con, TableMetaStorage tableMetaStorage) {
-        this.tableMetaStorage = tableMetaStorage;
-        InetSocketAddress address = con.getAddress();
-        this.tableMetaStorage.setDbAddress(address.getHostName()+":"+address.getPort());
-        cache.setMetaConnection(con);
-        cache.setTableMetaStorage(tableMetaStorage);
-        if (tableMetaStorage != null) {
-            try {
-                cache.init(tableMetaStorage.fetch()); // 初始化,从存储拉取TableMeta
-            } catch (IOException e) {
-                logger.error(e.getMessage());
-            }
-        }
-    }
-
-    @Override
-    public boolean apply(EntryPosition position, String fullTableName, String ddl, String extra) {
-        String[] strs = fullTableName.split("\\.");
-        String schema = strs[0];
-        if (schema.equalsIgnoreCase("null")) { // ddl schema为null,放弃处理
-            return false;
-        }
-        try {
-            TableMeta tableMeta = cache.get(schema, fullTableName, position.getTimestamp());
-            if (!compare(tableMeta, ddl)) { // 获取最近的TableMeta,进行比对
-                TableMeta result = cache.put(schema, fullTableName, ddl, calTimestamp(position.getTimestamp()));
-                if (tableMetaStorage != null && result != null) { // 储存
-                    tableMetaStorage.store(schema, fullTableName, result.getDdl(), calTimestamp(position.getTimestamp()));
-                }
-            }
-            return true;
-        } catch (Exception e) {
-            logger.error(e.toString());
-        }
-
-        return false;
-    }
-
-    @Override
-    public boolean isOnRDS() {
-        return false;
-    }
-
-    /***
-     *
-     * @param schema dbname
-     * @param table tablename
-     * @param useCache unused
-     * @param position timestamp
-     * @return
-     */
-    @Override
-    public TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position) {
-        String fulltbName = schema + "." + table;
-        try {
-            return cache.get(schema, fulltbName, position.getTimestamp());
-        } catch (Exception e) {
-            logger.error(e.toString());
-        }
-        return null;
-    }
-
-    @Override
-    public void clearTableMeta() {
-        cache.clearTableMeta();
-    }
-
-    @Override
-    public void clearTableMetaWithSchemaName(String schema) {
-        cache.clearTableMetaWithSchemaName(schema);
-    }
-
-    @Override
-    public void clearTableMeta(String schema, String table) {
-        cache.clearTableMeta(schema, table);
-    }
-
-    private boolean compare(TableMeta tableMeta, String ddl) {
-        if (tableMeta == null) {
-            return false;
-        }
-        return tableMeta.getDdl().equalsIgnoreCase(ddl);
-    }
-
-    private Long calTimestamp(Long timestamp) {
-        return timestamp;
-    }
-}

+ 0 - 55
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaEntry.java

@@ -1,55 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
-
-import java.io.Serializable;
-
-public class TableMetaEntry implements Serializable {
-
-    private static final long serialVersionUID = -1350200637109107904L;
-
-    private String dbAddress;
-    private String schema;
-    private String table;
-    private String ddl;
-    private Long timestamp;
-
-
-    public String getSchema() {
-        return schema;
-    }
-
-    public void setSchema(String schema) {
-        this.schema = schema;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public void setTable(String table) {
-        this.table = table;
-    }
-
-    public String getDdl() {
-        return ddl;
-    }
-
-    public void setDdl(String ddl) {
-        this.ddl = ddl;
-    }
-
-    public Long getTimestamp() {
-        return timestamp;
-    }
-
-    public void setTimestamp(Long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    public String getDbAddress() {
-        return dbAddress;
-    }
-
-    public void setDbAddress(String dbAddress) {
-        this.dbAddress = dbAddress;
-    }
-}

+ 0 - 18
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorage.java

@@ -1,18 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
-
-import java.util.List;
-
-public interface TableMetaStorage {
-
-    void store(String schema, String table, String ddl, Long timestamp);
-
-    List<TableMetaEntry> fetch();
-
-    List<TableMetaEntry> fetchByTableName(String tableName);
-
-    String getDbName();
-
-    String getDbAddress();
-
-    void setDbAddress(String address);
-}

+ 0 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorageFactory.java

@@ -1,9 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
-
-public interface TableMetaStorageFactory {
-
-    TableMetaStorage getTableMetaStorage();
-
-    String getDbName();
-
-}

+ 0 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/CacheConnectionNull.java

@@ -1,9 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception;
-
-public class CacheConnectionNull extends Exception{
-
-    @Override
-    public String toString() {
-        return "CacheConnectionNull";
-    }
-}

+ 0 - 21
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/NoHistoryException.java

@@ -1,21 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception;
-
-public class NoHistoryException extends Exception{
-
-    private String dbName;
-    private String tbName;
-
-    public NoHistoryException(String dbName, String tbName) {
-        this.dbName = dbName;
-        this.tbName = tbName;
-    }
-
-    public void printTableName() {
-        System.out.println(dbName+"."+tbName);
-    }
-
-    @Override
-    public String toString() {
-        return "NioHistoryException: " + dbName + " " + tbName;
-    }
-}

+ 0 - 14
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaCallback.java

@@ -1,14 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql;
-
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaEntry;
-
-import java.util.List;
-
-public interface MySqlTableMetaCallback {
-
-    void save(String dbAddress, String schema, String table,String ddl, Long timestamp);
-
-    List<TableMetaEntry> fetch(String dbAddress, String dbName);
-
-    List<TableMetaEntry> fetch(String dbAddress, String dbName, String tableName);
-}

+ 0 - 48
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorage.java

@@ -1,48 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql;
-
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaEntry;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
-
-import java.util.List;
-
-public class MySqlTableMetaStorage implements TableMetaStorage {
-    private MySqlTableMetaCallback mySqlTableMetaCallback;
-    private String dbName;
-    private String dbAddress;
-
-    MySqlTableMetaStorage(MySqlTableMetaCallback callback, String dbName) {
-        mySqlTableMetaCallback = callback;
-        this.dbName = dbName;
-    }
-
-
-    @Override
-    public void store(String schema, String table, String ddl, Long timestamp) {
-        mySqlTableMetaCallback.save(dbAddress, schema, table, ddl, timestamp);
-    }
-
-    @Override
-    public List<TableMetaEntry> fetch() {
-        return mySqlTableMetaCallback.fetch(dbAddress, dbName);
-    }
-
-    @Override
-    public List<TableMetaEntry> fetchByTableName(String tableName) {
-        return mySqlTableMetaCallback.fetch(dbAddress, dbName, tableName);
-    }
-
-    @Override
-    public String getDbName() {
-        return dbName;
-    }
-
-    @Override
-    public String getDbAddress() {
-        return dbAddress;
-    }
-
-    @Override
-    public void setDbAddress(String address) {
-        this.dbAddress = address;
-    }
-}

+ 0 - 26
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorageFactory.java

@@ -1,26 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql;
-
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorageFactory;
-
-public class MySqlTableMetaStorageFactory implements TableMetaStorageFactory {
-
-    private MySqlTableMetaCallback mySQLTableMetaCallback;
-    private String dbName;
-
-    public MySqlTableMetaStorageFactory(MySqlTableMetaCallback callback, String dbName) {
-        mySQLTableMetaCallback = callback;
-        this.dbName = dbName;
-    }
-
-    @Override
-    public TableMetaStorage getTableMetaStorage() {
-        return new MySqlTableMetaStorage(mySQLTableMetaCallback, dbName);
-    }
-
-    @Override
-    public String getDbName() {
-        return dbName;
-    }
-
-}

+ 0 - 5
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogEventParserProxyTest.java

@@ -1,9 +1,7 @@
 package com.alibaba.otter.canal.parse.inbound.mysql;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Calendar;
-import java.util.Date;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -12,9 +10,6 @@ import org.junit.Test;
 
 import com.alibaba.otter.canal.parse.helper.TimeoutChecker;
 import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaEntry;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaCallback;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaStorageFactory;
 import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;