Browse Source

Merge pull request #18 from alibaba/master

merge
rewerma 6 years ago
parent
commit
2e1a1c31e6

+ 2 - 2
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/BaseLogFetcherTest.java

@@ -68,7 +68,7 @@ public class BaseLogFetcherTest {
                     // update需要处理before/after
                     System.out.println("-------> before");
                     parseOneRow(event, buffer, columns, false);
-                    if (!buffer.nextOneRow(changeColumns)) {
+                    if (!buffer.nextOneRow(changeColumns, true)) {
                         break;
                     }
                     System.out.println("-------> after");
@@ -97,7 +97,7 @@ public class BaseLogFetcherTest {
             }
 
             ColumnInfo info = columnInfo[i];
-            buffer.nextValue(info.type, info.meta);
+            buffer.nextValue(null , i ,info.type, info.meta);
 
             if (buffer.isNull()) {
                 //

+ 1 - 1
deployer/src/main/resources/spring/tsdb/h2-tsdb.xml

@@ -23,7 +23,7 @@
 	</bean>
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 1 - 1
deployer/src/main/resources/spring/tsdb/mysql-tsdb.xml

@@ -23,7 +23,7 @@
 	</bean>
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/ddl/DruidDdlParser.java

@@ -71,6 +71,7 @@ public class DruidDdlParser {
                         DdlResult ddlResult = new DdlResult();
                         processName(ddlResult, schmeaName, alterTable.getName(), true);
                         processName(ddlResult, schmeaName, ((SQLAlterTableRename) item).getToName(), false);
+                        ddlResult.setType(EventType.RENAME);
                         ddlResults.add(ddlResult);
                     } else if (item instanceof SQLAlterTableAddIndex) {
                         DdlResult ddlResult = new DdlResult();

+ 32 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -7,6 +7,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
@@ -48,18 +49,26 @@ public class DatabaseTableMeta implements TableMetaTSDB {
     private static Logger             logger           = LoggerFactory.getLogger(DatabaseTableMeta.class);
     private static Pattern            pattern          = Pattern.compile("Duplicate entry '.*' for key '*'");
     private static Pattern            h2Pattern        = Pattern.compile("Unique index or primary key violation");
+    private static ScheduledExecutorService  scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread thread = new Thread(r, "[scheduler-table-meta-snapshot]");
+            thread.setDaemon(true);
+            return thread;
+        }
+    });
     private String                    destination;
     private MemoryTableMeta           memoryTableMeta;
     private MysqlConnection           connection;                                                                 // 查询meta信息的链接
     private CanalEventFilter          filter;
     private CanalEventFilter          blackFilter;
     private EntryPosition             lastPosition;
-    private ScheduledExecutorService  scheduler;
     private MetaHistoryDAO            metaHistoryDAO;
     private MetaSnapshotDAO           metaSnapshotDAO;
     private int                       snapshotInterval = 24;
     private int                       snapshotExpire   = 360;
-
+    private ScheduledFuture<?>        scheduleSnapshotFuture;
+    
     public DatabaseTableMeta(){
 
     }
@@ -68,19 +77,10 @@ public class DatabaseTableMeta implements TableMetaTSDB {
     public boolean init(final String destination) {
         this.destination = destination;
         this.memoryTableMeta = new MemoryTableMeta();
-        this.scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread thread = new Thread(r, "[scheduler-table-meta-snapshot]");
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
 
         // 24小时生成一份snapshot
         if (snapshotInterval > 0) {
-            scheduler.scheduleWithFixedDelay(new Runnable() {
+            scheduleSnapshotFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
 
                 @Override
                 public void run() {
@@ -105,6 +105,26 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         }
         return true;
     }
+    
+    @Override
+    public void destory() {
+        if (memoryTableMeta != null) {
+            memoryTableMeta.destory();
+        }
+        
+        if (connection != null) {
+            try {
+                connection.disconnect();
+            } catch (IOException e) {
+                logger.error("ERROR # disconnect meta connection for address:{}", connection.getConnector()
+                    .getAddress(), e);
+            }
+        }
+        
+        if (scheduleSnapshotFuture != null) {
+            scheduleSnapshotFuture.cancel(false);
+        }
+    }
 
     @Override
     public TableMeta find(String schema, String table) {

+ 5 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -58,6 +58,11 @@ public class MemoryTableMeta implements TableMetaTSDB {
     public boolean init(String destination) {
         return true;
     }
+    
+    @Override
+    public void destory() {
+        tableMetas.clear();
+    }
 
     public boolean apply(EntryPosition position, String schema, String ddl, String extra) {
         tableMetas.clear();

+ 5 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDB.java

@@ -18,6 +18,11 @@ public interface TableMetaTSDB {
      */
     public boolean init(String destination);
 
+    /**
+     * 销毁资源
+     */
+    public void destory();
+
     /**
      * 获取当前的表结构
      */

+ 1 - 1
parse/src/test/resources/tsdb/h2-tsdb.xml

@@ -7,7 +7,7 @@
        default-autowire="byName">
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 1 - 1
parse/src/test/resources/tsdb/mysql-tsdb.xml

@@ -7,7 +7,7 @@
        default-autowire="byName">
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 2 - 55
pom.xml

@@ -371,6 +371,7 @@
                 </configuration>
             </plugin>
             <!-- javadoc -->
+            <!--
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-javadoc-plugin</artifactId>
@@ -389,61 +390,7 @@
                   <additionalparam>-Xdoclint:none</additionalparam>
                 </configuration>
             </plugin>
-            <!--
-            <plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-javadoc-plugin</artifactId>
-				<version>2.7</version>
-				<executions>
-					<execution>
-						<id>attach-javadocs</id>
-							<goals>
-								<goal>jar</goal>
-							</goals>
-						</execution>
-					</executions>
-				<configuration>
-				  <encoding>${file_encoding}</encoding>
-				  <charset>${file_encoding}</charset>
-				  <doclet>org.jboss.apiviz.APIviz</doclet>
-				  <docletArtifact>
-					<groupId>org.jboss.apiviz</groupId>
-					<artifactId>apiviz</artifactId>
-					<version>1.3.0.GA</version>
-				  </docletArtifact>
-				  <useStandardDocletOptions>true</useStandardDocletOptions>
-				  <breakiterator>true</breakiterator>
-				  <version>true</version>
-				  <author>true</author>
-				  <keywords>true</keywords>
-				</configuration>
-			</plugin>
-
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jxr-plugin</artifactId>
-				<version>2.2</version>
-				<configuration>
-					<aggregate>true</aggregate>
-					<destDir>${project.basedir}/docs/sources</destDir>
-					<linkJavadoc>true</linkJavadoc>
-					<javadocDir>${project.basedir}/docs/javadoc</javadocDir>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-source-plugin</artifactId>
-				<version>2.1.2</version>
-				<executions>
-					<execution>
-					<id>attach-sources</id>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			-->
+            -->
         </plugins>
         <sourceDirectory>src/main/java</sourceDirectory>
         <testSourceDirectory>src/test/java</testSourceDirectory>

+ 12 - 9
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,8 +1,11 @@
 package com.alibaba.otter.canal.kafka;
 
-import java.util.List;
-import java.util.Properties;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -10,12 +13,8 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
+import java.util.List;
+import java.util.Properties;
 
 /**
  * kafka producer 主操作类
@@ -93,6 +92,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 logger.error(e.getMessage(), e);
                 // producer.abortTransaction();
                 callback.rollback();
+                return;
             }
         } else {
             // 发送扁平数据json
@@ -110,6 +110,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             logger.error(e.getMessage(), e);
                             // producer.abortTransaction();
                             callback.rollback();
+                            return;
                         }
                     } else {
                         if (canalDestination.getPartitionHash() != null
@@ -131,6 +132,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                         logger.error(e.getMessage(), e);
                                         // producer.abortTransaction();
                                         callback.rollback();
+                                        return;
                                     }
                                 }
                             }
@@ -145,6 +147,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 logger.error(e.getMessage(), e);
                                 // producer.abortTransaction();
                                 callback.rollback();
+                                return;
                             }
                         }
                     }

+ 10 - 8
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,7 +1,11 @@
 package com.alibaba.otter.canal.rocketmq;
 
-import java.util.List;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -12,12 +16,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.otter.canal.common.CanalMessageSerializer;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.server.exception.CanalServerException;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
+import java.util.List;
 
 public class CanalRocketMQProducer implements CanalMQProducer {
 
@@ -67,6 +66,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
             } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                 logger.error("Send message error!", e);
                 callback.rollback();
+                return;
             }
         } else {
             List<FlatMessage> flatMessages = FlatMessage.messageConverter(data);
@@ -90,6 +90,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                         } catch (Exception e) {
                             logger.error("send flat message to fixed partition error", e);
                             callback.rollback();
+                            return;
                         }
                     } else {
                         if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
@@ -124,6 +125,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                     } catch (Exception e) {
                                         logger.error("send flat message to hashed partition error", e);
                                         callback.rollback();
+                                        return;
                                     }
                                 }
                             }