Browse Source

Merge pull request #1099 from spccold/master

destory TableMetaTSDB when stop EventParser
agapple 6 years ago
parent
commit
59e65ed4a6

+ 1 - 1
deployer/src/main/resources/spring/tsdb/h2-tsdb.xml

@@ -23,7 +23,7 @@
 	</bean>
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 1 - 1
deployer/src/main/resources/spring/tsdb/mysql-tsdb.xml

@@ -23,7 +23,7 @@
 	</bean>
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 32 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -7,6 +7,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
@@ -48,18 +49,26 @@ public class DatabaseTableMeta implements TableMetaTSDB {
     private static Logger             logger           = LoggerFactory.getLogger(DatabaseTableMeta.class);
     private static Pattern            pattern          = Pattern.compile("Duplicate entry '.*' for key '*'");
     private static Pattern            h2Pattern        = Pattern.compile("Unique index or primary key violation");
+    private static ScheduledExecutorService  scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread thread = new Thread(r, "[scheduler-table-meta-snapshot]");
+            thread.setDaemon(true);
+            return thread;
+        }
+    });
     private String                    destination;
     private MemoryTableMeta           memoryTableMeta;
     private MysqlConnection           connection;                                                                 // 查询meta信息的链接
     private CanalEventFilter          filter;
     private CanalEventFilter          blackFilter;
     private EntryPosition             lastPosition;
-    private ScheduledExecutorService  scheduler;
     private MetaHistoryDAO            metaHistoryDAO;
     private MetaSnapshotDAO           metaSnapshotDAO;
     private int                       snapshotInterval = 24;
     private int                       snapshotExpire   = 360;
-
+    private ScheduledFuture<?>        scheduleSnapshotFuture;
+    
     public DatabaseTableMeta(){
 
     }
@@ -68,19 +77,10 @@ public class DatabaseTableMeta implements TableMetaTSDB {
     public boolean init(final String destination) {
         this.destination = destination;
         this.memoryTableMeta = new MemoryTableMeta();
-        this.scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread thread = new Thread(r, "[scheduler-table-meta-snapshot]");
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
 
         // 24小时生成一份snapshot
         if (snapshotInterval > 0) {
-            scheduler.scheduleWithFixedDelay(new Runnable() {
+            scheduleSnapshotFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
 
                 @Override
                 public void run() {
@@ -105,6 +105,26 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         }
         return true;
     }
+    
+    @Override
+    public void destory() {
+        if (memoryTableMeta != null) {
+            memoryTableMeta.destory();
+        }
+        
+        if (connection != null) {
+            try {
+                connection.disconnect();
+            } catch (IOException e) {
+                logger.error("ERROR # disconnect meta connection for address:{}", connection.getConnector()
+                    .getAddress(), e);
+            }
+        }
+        
+        if (scheduleSnapshotFuture != null) {
+            scheduleSnapshotFuture.cancel(false);
+        }
+    }
 
     @Override
     public TableMeta find(String schema, String table) {

+ 5 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -58,6 +58,11 @@ public class MemoryTableMeta implements TableMetaTSDB {
     public boolean init(String destination) {
         return true;
     }
+    
+    @Override
+    public void destory() {
+        tableMetas.clear();
+    }
 
     public boolean apply(EntryPosition position, String schema, String ddl, String extra) {
         tableMetas.clear();

+ 5 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDB.java

@@ -18,6 +18,11 @@ public interface TableMetaTSDB {
      */
     public boolean init(String destination);
 
+    /**
+     * 销毁资源
+     */
+    public void destory();
+
     /**
      * 获取当前的表结构
      */

+ 1 - 1
parse/src/test/resources/tsdb/h2-tsdb.xml

@@ -7,7 +7,7 @@
        default-autowire="byName">
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 1 - 1
parse/src/test/resources/tsdb/mysql-tsdb.xml

@@ -7,7 +7,7 @@
        default-autowire="byName">
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>