Forráskód Böngészése

let tsdbSnapshotInterval config work correctly

wuwo 6 éve
szülő
commit
5ad9cf3f16

+ 1 - 1
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -304,7 +304,6 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             }
             }
             boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable());
             boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable());
             if (tsdbEnable) {
             if (tsdbEnable) {
-                mysqlEventParser.setEnableTsdb(tsdbEnable);
                 mysqlEventParser.setTableMetaTSDBFactory(new DefaultTableMetaTSDBFactory() {
                 mysqlEventParser.setTableMetaTSDBFactory(new DefaultTableMetaTSDBFactory() {
 
 
                     @Override
                     @Override
@@ -327,6 +326,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
                         }
                         }
                     }
                     }
                 });
                 });
+                mysqlEventParser.setEnableTsdb(tsdbEnable);
             }
             }
             eventParser = mysqlEventParser;
             eventParser = mysqlEventParser;
         } else if (type.isLocalBinlog()) {
         } else if (type.isLocalBinlog()) {

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -57,6 +57,7 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
             ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
             ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
             ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval);
             ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval);
             ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire);
             ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire);
+            ((DatabaseTableMeta) tableMetaTSDB).init(destination);
         }
         }
 
 
         tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);
         tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -124,6 +124,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
                 ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
                 ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval);
                 ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval);
                 ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire);
                 ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire);
+                ((DatabaseTableMeta) tableMetaTSDB).init(destination);
             }
             }
 
 
             tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);
             tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);

+ 30 - 26
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -10,6 +10,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
@@ -62,9 +63,10 @@ public class DatabaseTableMeta implements TableMetaTSDB {
                                                                  }
                                                                  }
                                                              });
                                                              });
     private ReadWriteLock                   lock             = new ReentrantReadWriteLock();
     private ReadWriteLock                   lock             = new ReentrantReadWriteLock();
+    private AtomicBoolean                   initialized      = new AtomicBoolean(false);
     private String                          destination;
     private String                          destination;
     private MemoryTableMeta                 memoryTableMeta;
     private MemoryTableMeta                 memoryTableMeta;
-    private MysqlConnection                 connection;                                                                 // 查询meta信息的链接
+    private volatile MysqlConnection        connection;                                                                 // 查询meta信息的链接
     private CanalEventFilter                filter;
     private CanalEventFilter                filter;
     private CanalEventFilter                blackFilter;
     private CanalEventFilter                blackFilter;
     private EntryPosition                   lastPosition;
     private EntryPosition                   lastPosition;
@@ -74,40 +76,42 @@ public class DatabaseTableMeta implements TableMetaTSDB {
     private int                             snapshotInterval = 24;
     private int                             snapshotInterval = 24;
     private int                             snapshotExpire   = 360;
     private int                             snapshotExpire   = 360;
     private ScheduledFuture<?>              scheduleSnapshotFuture;
     private ScheduledFuture<?>              scheduleSnapshotFuture;
-
+    
     public DatabaseTableMeta(){
     public DatabaseTableMeta(){
 
 
     }
     }
 
 
     @Override
     @Override
     public boolean init(final String destination) {
     public boolean init(final String destination) {
-        this.destination = destination;
-        this.memoryTableMeta = new MemoryTableMeta();
-
-        // 24小时生成一份snapshot
-        if (snapshotInterval > 0) {
-            scheduleSnapshotFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
-
-                @Override
-                public void run() {
-                    boolean applyResult = false;
-                    try {
-                        MDC.put("destination", destination);
-                        applyResult = applySnapshotToDB(lastPosition, false);
-                    } catch (Throwable e) {
-                        logger.error("scheudle applySnapshotToDB faield", e);
-                    }
+        if (initialized.compareAndSet(false, true)) {
+            this.destination = destination;
+            this.memoryTableMeta = new MemoryTableMeta();
+
+            // 24小时生成一份snapshot
+            if (snapshotInterval > 0) {
+                scheduleSnapshotFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        boolean applyResult = false;
+                        try {
+                            MDC.put("destination", destination);
+                            applyResult = applySnapshotToDB(lastPosition, false);
+                        } catch (Throwable e) {
+                            logger.error("scheudle applySnapshotToDB faield", e);
+                        }
 
 
-                    try {
-                        MDC.put("destination", destination);
-                        if (applyResult) {
-                            snapshotExpire((int) TimeUnit.HOURS.toSeconds(snapshotExpire));
+                        try {
+                            MDC.put("destination", destination);
+                            if (applyResult) {
+                                snapshotExpire((int) TimeUnit.HOURS.toSeconds(snapshotExpire));
+                            }
+                        } catch (Throwable e) {
+                            logger.error("scheudle snapshotExpire faield", e);
                         }
                         }
-                    } catch (Throwable e) {
-                        logger.error("scheudle snapshotExpire faield", e);
                     }
                     }
-                }
-            }, snapshotInterval, snapshotInterval, TimeUnit.HOURS);
+                }, snapshotInterval, snapshotInterval, TimeUnit.HOURS);
+            }
         }
         }
         return true;
         return true;
     }
     }

+ 0 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBBuilder.java

@@ -36,7 +36,6 @@ public class TableMetaTSDBBuilder {
                 }
                 }
             }
             }
             TableMetaTSDB tableMetaTSDB = (TableMetaTSDB) applicationContext.getBean("tableMetaTSDB");
             TableMetaTSDB tableMetaTSDB = (TableMetaTSDB) applicationContext.getBean("tableMetaTSDB");
-            tableMetaTSDB.init(destination);
             logger.info("{} init TableMetaTSDB with {}", destination, springXml);
             logger.info("{} init TableMetaTSDB with {}", destination, springXml);
             return tableMetaTSDB;
             return tableMetaTSDB;
         } else {
         } else {