Browse Source

improve DatabaseTableMeta

wuwo 6 years ago
parent
commit
e608f09b53

+ 14 - 14
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -7,6 +7,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
@@ -48,18 +49,26 @@ public class DatabaseTableMeta implements TableMetaTSDB {
     private static Logger             logger           = LoggerFactory.getLogger(DatabaseTableMeta.class);
     private static Logger             logger           = LoggerFactory.getLogger(DatabaseTableMeta.class);
     private static Pattern            pattern          = Pattern.compile("Duplicate entry '.*' for key '*'");
     private static Pattern            pattern          = Pattern.compile("Duplicate entry '.*' for key '*'");
     private static Pattern            h2Pattern        = Pattern.compile("Unique index or primary key violation");
     private static Pattern            h2Pattern        = Pattern.compile("Unique index or primary key violation");
+    private static ScheduledExecutorService  scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread thread = new Thread(r, "[scheduler-table-meta-snapshot]");
+            thread.setDaemon(true);
+            return thread;
+        }
+    });
     private String                    destination;
     private String                    destination;
     private MemoryTableMeta           memoryTableMeta;
     private MemoryTableMeta           memoryTableMeta;
     private MysqlConnection           connection;                                                                 // 查询meta信息的链接
     private MysqlConnection           connection;                                                                 // 查询meta信息的链接
     private CanalEventFilter          filter;
     private CanalEventFilter          filter;
     private CanalEventFilter          blackFilter;
     private CanalEventFilter          blackFilter;
     private EntryPosition             lastPosition;
     private EntryPosition             lastPosition;
-    private ScheduledExecutorService  scheduler;
     private MetaHistoryDAO            metaHistoryDAO;
     private MetaHistoryDAO            metaHistoryDAO;
     private MetaSnapshotDAO           metaSnapshotDAO;
     private MetaSnapshotDAO           metaSnapshotDAO;
     private int                       snapshotInterval = 24;
     private int                       snapshotInterval = 24;
     private int                       snapshotExpire   = 360;
     private int                       snapshotExpire   = 360;
-
+    private ScheduledFuture<?>        scheduleSnapshotFuture;
+    
     public DatabaseTableMeta(){
     public DatabaseTableMeta(){
 
 
     }
     }
@@ -68,19 +77,10 @@ public class DatabaseTableMeta implements TableMetaTSDB {
     public boolean init(final String destination) {
     public boolean init(final String destination) {
         this.destination = destination;
         this.destination = destination;
         this.memoryTableMeta = new MemoryTableMeta();
         this.memoryTableMeta = new MemoryTableMeta();
-        this.scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread thread = new Thread(r, String.format("[scheduler-table-meta-snapshot-%s]", destination));
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
 
 
         // 24小时生成一份snapshot
         // 24小时生成一份snapshot
         if (snapshotInterval > 0) {
         if (snapshotInterval > 0) {
-            scheduler.scheduleWithFixedDelay(new Runnable() {
+            scheduleSnapshotFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
 
 
                 @Override
                 @Override
                 public void run() {
                 public void run() {
@@ -121,8 +121,8 @@ public class DatabaseTableMeta implements TableMetaTSDB {
             }
             }
         }
         }
         
         
-        if (scheduler != null) {
-            scheduler.shutdown();
+        if (scheduleSnapshotFuture != null) {
+            scheduleSnapshotFuture.cancel(false);
         }
         }
     }
     }