Bläddra i källkod

fixed issue #1047 , support snapshot expire

七锋 6 år sedan
förälder
incheckning
f822761ab2
18 ändrade filer med 176 tillägg och 39 borttagningar
  1. 4 0
      deployer/src/main/resources/canal.properties
  2. 2 0
      deployer/src/main/resources/spring/default-instance.xml
  3. 2 0
      deployer/src/main/resources/spring/file-instance.xml
  4. 2 0
      deployer/src/main/resources/spring/memory-instance.xml
  5. 2 2
      deployer/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml
  6. 2 2
      deployer/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml
  7. 6 0
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java
  8. 18 0
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java
  9. 18 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  10. 2 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java
  11. 2 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  12. 46 15
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
  13. 4 7
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaHistoryDAO.java
  14. 4 7
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaSnapshotDAO.java
  15. 17 2
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MetaHistoryDAOTest.java
  16. 41 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MetaSnapshotDAOTest.java
  17. 2 2
      parse/src/test/resources/tsdb/sql-map/sqlmap_history.xml
  18. 2 2
      parse/src/test/resources/tsdb/sql-map/sqlmap_snapshot.xml

+ 4 - 0
deployer/src/main/resources/canal.properties

@@ -69,6 +69,10 @@ canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destinat
 canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
 canal.instance.tsdb.dbUsername=canal
 canal.instance.tsdb.dbPassword=canal
+# dump snapshot interval, default 24 hour
+canal.instance.tsdb.snapshot.interval=24
+# purge snapshot expire , default 360 hour(15 days)
+canal.instance.tsdb.snapshot.expire=360
 
 # rds oss binlog account
 canal.instance.rds.accesskey =

+ 2 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -176,6 +176,8 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		<property name="tsdbSnapshotInterval" value="${canal.instance.tsdb.snapshot.interval:24}" />
+		<property name="tsdbSnapshotExpire" value="${canal.instance.tsdb.snapshot.expire:360}" />
 		
 		<!--是否启用GTID模式-->
 		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>

+ 2 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -161,6 +161,8 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		<property name="tsdbSnapshotInterval" value="${canal.instance.tsdb.snapshot.interval:24}" />
+		<property name="tsdbSnapshotExpire" value="${canal.instance.tsdb.snapshot.expire:360}" />
 
 		<!--是否启用GTID模式-->
 		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>

+ 2 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -149,6 +149,8 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		<property name="tsdbSnapshotInterval" value="${canal.instance.tsdb.snapshot.interval:24}" />
+		<property name="tsdbSnapshotExpire" value="${canal.instance.tsdb.snapshot.expire:360}" />
 		
 		<!--是否启用GTID模式-->
 		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>

+ 2 - 2
deployer/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml

@@ -36,10 +36,10 @@
     </delete>
 
 
-    <delete id="deleteByGmtModified" parameterClass="java.util.Map">
+    <delete id="deleteByTimestamp" parameterClass="java.util.Map">
         <![CDATA[
 		delete from meta_history
-		where gmt_modified < timestamp(#timestamp#)
+		where destination=#destination# and binlog_timestamp < #timestamp#
         ]]>
     </delete>
 </sqlMap>

+ 2 - 2
deployer/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml

@@ -42,10 +42,10 @@
         where destination=#destination#
     </delete>
 
-    <delete id="deleteByGmtModified" parameterClass="java.util.Map">
+    <delete id="deleteByTimestamp" parameterClass="java.util.Map">
         <![CDATA[
 		delete from meta_snapshot
-		where gmt_modified < timestamp(#timestamp#)
+		where destination=#destination# and binlog_timestamp < #timestamp# and binlog_timestamp > 0
         ]]>
     </delete>
 </sqlMap>

+ 6 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -296,6 +296,12 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             mysqlEventParser.setFilterTableError(parameters.getFilterTableError());
             mysqlEventParser.setIsGTIDMode(BooleanUtils.toBoolean(parameters.getGtidEnable()));
             // tsdb
+            if (parameters.getTsdbSnapshotInterval() != null) {
+                mysqlEventParser.setTsdbSnapshotInterval(parameters.getTsdbSnapshotInterval());
+            }
+            if (parameters.getTsdbSnapshotExpire() != null) {
+                mysqlEventParser.setTsdbSnapshotExpire(parameters.getTsdbSnapshotExpire());
+            }
             boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable());
             if (tsdbEnable) {
                 mysqlEventParser.setEnableTsdb(tsdbEnable);

+ 18 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -98,6 +98,8 @@ public class CanalParameter implements Serializable {
     private String                   tsdbJdbcUrl;
     private String                   tsdbJdbcUserName;
     private String                   tsdbJdbcPassword;
+    private Integer                  tsdbSnapshotInterval               = 24;
+    private Integer                  tsdbSnapshotExpire                 = 360;
     private String                   rdsAccesskey;
     private String                   rdsSecretkey;
     private String                   rdsInstanceId;
@@ -964,6 +966,22 @@ public class CanalParameter implements Serializable {
         this.memoryStorageRawEntry = memoryStorageRawEntry;
     }
 
+    public Integer getTsdbSnapshotInterval() {
+        return tsdbSnapshotInterval;
+    }
+
+    public void setTsdbSnapshotInterval(Integer tsdbSnapshotInterval) {
+        this.tsdbSnapshotInterval = tsdbSnapshotInterval;
+    }
+
+    public Integer getTsdbSnapshotExpire() {
+        return tsdbSnapshotExpire;
+    }
+
+    public void setTsdbSnapshotExpire(Integer tsdbSnapshotExpire) {
+        this.tsdbSnapshotExpire = tsdbSnapshotExpire;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }

+ 18 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -26,6 +26,8 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
     protected TableMetaTSDBFactory tableMetaTSDBFactory      = new DefaultTableMetaTSDBFactory();
     protected boolean              enableTsdb                = false;
+    protected int                  tsdbSnapshotInterval      = 24;
+    protected int                  tsdbSnapshotExpire        = 360;
     protected String               tsdbSpringXml;
     protected TableMetaTSDB        tableMetaTSDB;
 
@@ -210,4 +212,20 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         return this.receivedBinlogBytes;
     }
 
+    public int getTsdbSnapshotInterval() {
+        return tsdbSnapshotInterval;
+    }
+
+    public void setTsdbSnapshotInterval(int tsdbSnapshotInterval) {
+        this.tsdbSnapshotInterval = tsdbSnapshotInterval;
+    }
+
+    public int getTsdbSnapshotExpire() {
+        return tsdbSnapshotExpire;
+    }
+
+    public void setTsdbSnapshotExpire(int tsdbSnapshotExpire) {
+        this.tsdbSnapshotExpire = tsdbSnapshotExpire;
+    }
+
 }

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -55,6 +55,8 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
             ((DatabaseTableMeta) tableMetaTSDB).setConnection(metaConnection);
             ((DatabaseTableMeta) tableMetaTSDB).setFilter(eventFilter);
             ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
+            ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval);
+            ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire);
         }
 
         tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -121,6 +121,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 ((DatabaseTableMeta) tableMetaTSDB).setConnection(metaConnection);
                 ((DatabaseTableMeta) tableMetaTSDB).setFilter(eventFilter);
                 ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
+                ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval);
+                ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire);
             }
 
             tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);

+ 46 - 15
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -44,19 +44,21 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
  */
 public class DatabaseTableMeta implements TableMetaTSDB {
 
-    public static final EntryPosition INIT_POSITION = new EntryPosition("0", 0L, -2L, -1L);
-    private static Logger             logger        = LoggerFactory.getLogger(DatabaseTableMeta.class);
-    private static Pattern            pattern       = Pattern.compile("Duplicate entry '.*' for key '*'");
-    private static Pattern            h2Pattern     = Pattern.compile("Unique index or primary key violation");
+    public static final EntryPosition INIT_POSITION    = new EntryPosition("0", 0L, -2L, -1L);
+    private static Logger             logger           = LoggerFactory.getLogger(DatabaseTableMeta.class);
+    private static Pattern            pattern          = Pattern.compile("Duplicate entry '.*' for key '*'");
+    private static Pattern            h2Pattern        = Pattern.compile("Unique index or primary key violation");
     private String                    destination;
     private MemoryTableMeta           memoryTableMeta;
-    private MysqlConnection           connection;                                                              // 查询meta信息的链接
+    private MysqlConnection           connection;                                                                 // 查询meta信息的链接
     private CanalEventFilter          filter;
     private CanalEventFilter          blackFilter;
     private EntryPosition             lastPosition;
     private ScheduledExecutorService  scheduler;
     private MetaHistoryDAO            metaHistoryDAO;
     private MetaSnapshotDAO           metaSnapshotDAO;
+    private int                       snapshotInterval = 24;
+    private int                       snapshotExpire   = 360;
 
     public DatabaseTableMeta(){
 
@@ -77,18 +79,27 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         });
 
         // 24小时生成一份snapshot
-        scheduler.scheduleWithFixedDelay(new Runnable() {
+        if (snapshotInterval > 0) {
+            scheduler.scheduleWithFixedDelay(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        MDC.put("destination", destination);
+                        applySnapshotToDB(lastPosition, false);
+                    } catch (Throwable e) {
+                        logger.error("scheudle applySnapshotToDB faield", e);
+                    }
 
-            @Override
-            public void run() {
-                try {
-                    MDC.put("destination", destination);
-                    applySnapshotToDB(lastPosition, false);
-                } catch (Throwable e) {
-                    logger.error("scheudle applySnapshotToDB faield", e);
+                    try {
+                        MDC.put("destination", destination);
+                        snapshotExpire((int) TimeUnit.HOURS.toSeconds(snapshotExpire));
+                    } catch (Throwable e) {
+                        logger.error("scheudle snapshotExpire faield", e);
+                    }
                 }
-            }
-        }, 24, 24, TimeUnit.HOURS);
+            }, snapshotInterval, snapshotInterval, TimeUnit.HOURS);
+        }
         return true;
     }
 
@@ -461,6 +472,10 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         return true;
     }
 
+    private int snapshotExpire(int expireTimestamp) {
+        return metaSnapshotDAO.deleteByTimestamp(destination, expireTimestamp);
+    }
+
     public void setConnection(MysqlConnection connection) {
         this.connection = connection;
     }
@@ -489,6 +504,22 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         this.blackFilter = blackFilter;
     }
 
+    public int getSnapshotInterval() {
+        return snapshotInterval;
+    }
+
+    public void setSnapshotInterval(int snapshotInterval) {
+        this.snapshotInterval = snapshotInterval;
+    }
+
+    public int getSnapshotExpire() {
+        return snapshotExpire;
+    }
+
+    public void setSnapshotExpire(int snapshotExpire) {
+        this.snapshotExpire = snapshotExpire;
+    }
+
     public MysqlConnection getConnection() {
         return connection;
     }

+ 4 - 7
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaHistoryDAO.java

@@ -1,7 +1,5 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 
@@ -37,13 +35,12 @@ public class MetaHistoryDAO extends MetaBaseDAO {
     /**
      * 删除interval秒之前的数据
      */
-    public Integer deleteByGmtModified(int interval) {
+    public Integer deleteByTimestamp(String destination, int interval) {
         HashMap params = Maps.newHashMapWithExpectedSize(2);
         long timestamp = System.currentTimeMillis() - interval * 1000;
-        Date date = new Date(timestamp);
-        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        params.put("timestamp", format.format(date));
-        return getSqlMapClientTemplate().delete("meta_history.deleteByGmtModified", params);
+        params.put("timestamp", timestamp);
+        params.put("destination", destination);
+        return getSqlMapClientTemplate().delete("meta_history.deleteByTimestamp", params);
     }
 
     protected void initDao() throws Exception {

+ 4 - 7
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaSnapshotDAO.java

@@ -1,7 +1,5 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.HashMap;
 
 import com.google.common.collect.Maps;
@@ -40,13 +38,12 @@ public class MetaSnapshotDAO extends MetaBaseDAO {
     /**
      * 删除interval秒之前的数据
      */
-    public Integer deleteByGmtModified(int interval) {
+    public Integer deleteByTimestamp(String destination, int interval) {
         HashMap params = Maps.newHashMapWithExpectedSize(2);
         long timestamp = System.currentTimeMillis() - interval * 1000;
-        Date date = new Date(timestamp);
-        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        params.put("timestamp", format.format(date));
-        return getSqlMapClientTemplate().delete("meta_snapshot.deleteByGmtModified", params);
+        params.put("timestamp", timestamp);
+        params.put("destination", destination);
+        return getSqlMapClientTemplate().delete("meta_snapshot.deleteByTimestamp", params);
     }
 
     protected void initDao() throws Exception {

+ 17 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MetaHistoryDAOTest.java

@@ -16,7 +16,7 @@ import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDO;
  * Created by wanshao Date: 2017/9/20 Time: 下午5:00
  **/
 @RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(locations = { "/tsdb/mysql-tsdb.xml" })
+@ContextConfiguration(locations = { "/tsdb/h2-tsdb.xml" })
 public class MetaHistoryDAOTest {
 
     @Resource
@@ -24,7 +24,22 @@ public class MetaHistoryDAOTest {
 
     @Test
     public void testSimple() {
-        List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO.findByTimestamp("test", 0L, 0L);
+        MetaHistoryDO historyDO = new MetaHistoryDO();
+        historyDO.setDestination("test");
+        historyDO.setBinlogFile("000001");
+        historyDO.setBinlogOffest(4L);
+        historyDO.setBinlogMasterId("1");
+        historyDO.setBinlogTimestamp(System.currentTimeMillis() - 7300 * 1000);
+        historyDO.setSqlSchema("test");
+        historyDO.setUseSchema("test");
+        historyDO.setSqlTable("testTable");
+        historyDO.setSqlTable("drop table testTable");
+        metaHistoryDAO.insert(historyDO);
+
+        int count = metaHistoryDAO.deleteByTimestamp("test", 7200);
+        System.out.println(count);
+
+        List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO.findByTimestamp("test", 0L, System.currentTimeMillis());
         for (MetaHistoryDO metaHistoryDO : metaHistoryDOList) {
             System.out.println(metaHistoryDO.getId());
         }

+ 41 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MetaSnapshotDAOTest.java

@@ -0,0 +1,41 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
+
+import javax.annotation.Resource;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDO;
+
+/**
+ * Created by wanshao Date: 2017/9/20 Time: 下午5:00
+ **/
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(locations = { "/tsdb/h2-tsdb.xml" })
+public class MetaSnapshotDAOTest {
+
+    @Resource
+    MetaSnapshotDAO metaSnapshotDAO;
+
+    @Test
+    public void testSimple() {
+        MetaSnapshotDO metaSnapshotDO = new MetaSnapshotDO();
+        metaSnapshotDO.setDestination("test");
+        metaSnapshotDO.setBinlogFile("000001");
+        metaSnapshotDO.setBinlogOffest(4L);
+        metaSnapshotDO.setBinlogMasterId("1");
+        metaSnapshotDO.setBinlogTimestamp(System.currentTimeMillis() - 7300 * 1000);
+        metaSnapshotDO.setData("test");
+        metaSnapshotDAO.insert(metaSnapshotDO);
+
+        MetaSnapshotDO snapshotDO = metaSnapshotDAO.findByTimestamp("test", System.currentTimeMillis());
+        System.out.println(snapshotDO);
+
+        int count = metaSnapshotDAO.deleteByTimestamp("test", 7200);
+        System.out.println(count);
+    }
+
+}

+ 2 - 2
parse/src/test/resources/tsdb/sql-map/sqlmap_history.xml

@@ -36,10 +36,10 @@
     </delete>
 
 
-    <delete id="deleteByGmtModified" parameterClass="java.util.Map">
+    <delete id="deleteByTimestamp" parameterClass="java.util.Map">
         <![CDATA[
 		delete from meta_history
-		where gmt_modified < timestamp(#timestamp#)
+		where destination=#destination# and binlog_timestamp < #timestamp#
         ]]>
     </delete>
 </sqlMap>

+ 2 - 2
parse/src/test/resources/tsdb/sql-map/sqlmap_snapshot.xml

@@ -54,10 +54,10 @@
         where destination=#destination#
     </delete>
 
-    <delete id="deleteByGmtModified" parameterClass="java.util.Map">
+    <delete id="deleteByTimestamp" parameterClass="java.util.Map">
         <![CDATA[
 		delete from meta_snapshot
-		where gmt_modified < timestamp(#timestamp#)
+		where destination=#destination# and binlog_timestamp < #timestamp# and binlog_timestamp > 0
         ]]>
     </delete>
 </sqlMap>