Browse Source

destory TableMetaTSDB when stop EventParser

wuwo 6 years ago
parent
commit
beed10901d

+ 1 - 1
deployer/src/main/resources/spring/tsdb/h2-tsdb.xml

@@ -23,7 +23,7 @@
 	</bean>
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 1 - 1
deployer/src/main/resources/spring/tsdb/mysql-tsdb.xml

@@ -23,7 +23,7 @@
 	</bean>
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 21 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -72,7 +72,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
 
             @Override
             public Thread newThread(Runnable r) {
-                Thread thread = new Thread(r, "[scheduler-table-meta-snapshot]");
+                Thread thread = new Thread(r, String.format("[scheduler-table-meta-snapshot-%s]", destination));
                 thread.setDaemon(true);
                 return thread;
             }
@@ -105,6 +105,26 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         }
         return true;
     }
+    
+    @Override
+    public void destory() {
+        if (memoryTableMeta != null) {
+            memoryTableMeta.destory();
+        }
+        
+        if (connection != null) {
+            try {
+                connection.disconnect();
+            } catch (IOException e) {
+                logger.error("ERROR # disconnect meta connection for address:{}", connection.getConnector()
+                    .getAddress(), e);
+            }
+        }
+        
+        if (scheduler != null) {
+            scheduler.shutdown();
+        }
+    }
 
     @Override
     public TableMeta find(String schema, String table) {

+ 5 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -58,6 +58,11 @@ public class MemoryTableMeta implements TableMetaTSDB {
     public boolean init(String destination) {
         return true;
     }
+    
+    @Override
+    public void destory() {
+        tableMetas.clear();
+    }
 
     public boolean apply(EntryPosition position, String schema, String ddl, String extra) {
         tableMetas.clear();

+ 5 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDB.java

@@ -18,6 +18,11 @@ public interface TableMetaTSDB {
      */
     public boolean init(String destination);
 
+    /**
+     * 销毁资源
+     */
+    public void destory();
+
     /**
      * 获取当前的表结构
      */

+ 1 - 1
parse/src/test/resources/tsdb/h2-tsdb.xml

@@ -7,7 +7,7 @@
        default-autowire="byName">
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 1 - 1
parse/src/test/resources/tsdb/mysql-tsdb.xml

@@ -7,7 +7,7 @@
        default-autowire="byName">
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>