Browse Source

1、BinLogFileQueue的listBinlogFiles方法中对binlog文件名进行更严格的验证,必须以数字结尾
2、LocalBinLogConnection类中解决如下bug
2.1第二个dump方法中增加代码"decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT);",解决"((QueryLogEvent)event).getQuery()"语句出现乱码的问题(BEGIN后面多了4个字节),出现乱码不会进入if语句,导致找到的binlog位点不准确
2.2第一个dump方法中,被注释的decode代码才是正确的,和DirectLogFetcher不同,此处需要构造子循环进行遍历
2.3第一个dump方法中,将每个binlog-file的首个event设置为RotateLogEvent,否则会导致所有event的binlogFileName都是默认的mysql-bin.000001。这么改完之后,和DirectLogFetcher的行为相契合,后者在每个binlogfile的首个event就是RotateLogEvent
3、LocalBinlogEventParser中增加元数据处理的相关代码,不加的话,所有的event都不带列名,基本上没法儿使用

lubiao 8 years ago
parent
commit
350f8637c3

+ 163 - 0
deployer/src/main/resources/spring/local-instance.xml

@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
+	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:lang="http://www.springframework.org/schema/lang"
+	xmlns:context="http://www.springframework.org/schema/context"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
+           http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.0.xsd
+           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
+           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
+	default-autowire="byName">
+
+	<!-- properties -->
+	<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
+		<property name="ignoreResourceNotFound" value="true" />
+		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
+		<property name="locationNames">
+			<list>
+				<value>classpath:canal.properties</value>
+				<value>classpath:${canal.instance.destination:}/instance.properties</value>
+			</list>
+		</property>
+	</bean>
+	
+	<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
+	<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> 
+		<property name="propertyEditorRegistrars">
+			<list>
+				<ref bean="socketAddressEditor" />
+			</list>
+		</property>
+	</bean>
+	
+	<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
+		<property name="destination" value="${canal.instance.destination}" />
+		<property name="eventParser">
+			<ref local="eventParser" />
+		</property>
+		<property name="eventSink">
+			<ref local="eventSink" />
+		</property>
+		<property name="eventStore">
+			<ref local="eventStore" />
+		</property>
+		<property name="metaManager">
+			<ref local="metaManager" />
+		</property>
+		<property name="alarmHandler">
+			<ref local="alarmHandler" />
+		</property>
+		<property name="eventFetcher">
+			<ref local="eventFetcher"/>
+		</property>
+	</bean>
+	
+	<bean id="eventFetcher" class="com.alibaba.otter.canal.mq.fetch.CanalEventFetcherWithMq">
+		<property name="enable" value="${canal.instance.mq.enable}"/>
+		<property name="sendMode" value="${canal.instance.mq.sendMode}"/>
+	</bean>
+	
+	<!-- 报警处理类 -->
+	<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.MsgAlarmHandler">
+		<property name="enabled" value="${canal.alarm.enable}" />
+		<property name="environment" value="${canal.alarm.environment}" />
+		<property name="intercenterApi" value="${canal.alarm.intercenter.api}" />
+		<property name="administrators" value="${canal.alarm.administrators}" />
+	</bean>
+	
+	<bean id="zkClientx" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean" >
+		<property name="targetClass" value="com.alibaba.otter.canal.common.zookeeper.ZkClientx" />
+		<property name="targetMethod" value="getZkClient" />
+		<property name="arguments">
+			<list>
+				<value>${canal.zkServers:127.0.0.1:2181}</value>
+			</list>
+		</property>
+	</bean>
+	
+	<bean id="metaManager" class="com.alibaba.otter.canal.meta.PeriodMixedMetaManager">
+		<property name="zooKeeperMetaManager">
+			<bean class="com.alibaba.otter.canal.meta.ZooKeeperMetaManager">
+				<property name="zkClientx" ref="zkClientx" />
+			</bean>
+		</property>
+		<property name="period" value="${canal.zookeeper.flush.period:1000}" />
+	</bean>
+	
+	<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
+		<property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
+		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
+		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
+		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
+	</bean>
+	
+	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
+		<property name="eventStore" ref="eventStore" />
+	</bean>
+
+	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser">
+		<property name="destination" value="${canal.instance.destination}" />			
+		<property name="alarmHandler" ref="alarmHandler" />
+		
+		<!-- 解析过滤处理 -->
+		<property name="eventFilter">
+			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
+				<constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
+			</bean>
+		</property>
+		
+		<property name="eventBlackFilter">
+			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
+				<constructor-arg index="0" value="${canal.instance.filter.black.regex:}" />
+				<constructor-arg index="1" value="false" />
+			</bean>
+		</property>
+		
+		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
+		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
+			
+		<!-- 解析编码 -->
+		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
+	
+		<!-- 解析位点记录 -->
+		<property name="logPositionManager">
+			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
+				<property name="primary">
+					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
+				</property>
+				<property name="failback">
+					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
+						<property name="metaManager" ref="metaManager" />
+					</bean>
+				</property>
+			</bean>
+		</property>
+		
+		<!-- 解析数据库信息 -->
+		<property name="masterInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+				<property name="address" value="${canal.instance.master.address}" />
+				<property name="username" value="${canal.instance.dbUsername:retl}" />
+				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+			</bean>
+		</property>
+		
+		<!-- 解析起始位点 -->
+		<property name="masterPosition">
+			<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
+				<property name="journalName" value="${canal.instance.master.journal.name}" />
+				<property name="position" value="${canal.instance.master.position}" />
+				<property name="timestamp" value="${canal.instance.master.timestamp}" />
+			</bean>
+		</property>
+		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
+		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
+		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
+		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
+		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
+		<property name="needWait" value="${canal.instance.parser.needWait:true}"/>
+		<property name="directory" value="${canal.instance.parser.directory}"/>
+	</bean>
+</beans>

+ 19 - 13
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -8,7 +8,6 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.local.BinLogFileQueue;
@@ -18,6 +17,7 @@ import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.LogPosition;
 import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
 
 /**
  * local bin log connection (not real connection)
@@ -83,9 +83,9 @@ public class LocalBinLogConnection implements ErosaConnection {
             context.setLogPosition(new LogPosition(binlogfilename, binlogPosition));
             while (running) {
                 boolean needContinue = true;
-                LogEvent event;
-                while (fetcher.fetch()) {
-                    event = decoder.decode(fetcher, context);
+				LogEvent event = null;
+                L:while (fetcher.fetch()) {
+                    /*event = decoder.decode(fetcher, context);
                     if (event == null) {
                         throw new CanalParseException("parse failed");
                     }
@@ -93,15 +93,20 @@ public class LocalBinLogConnection implements ErosaConnection {
                     if (!func.sink(event)) {
                         needContinue = false;
                         break;
-                    }
-
-                    // do {
-                    // event = decoder.decode(fetcher, context);
-                    // if (event != null && !func.sink(event)) {
-                    // needContinue = false;
-                    // break;
-                    // }
-                    // } while (event != null);
+                    }*/
+
+					do {
+						if (event == null) {
+							event = new RotateLogEvent(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());
+						} else {
+							event = decoder.decode(fetcher, context);
+						}
+												
+						if (event != null && !func.sink(event)) {
+							needContinue = false;
+							break L;
+						}
+					} while (event != null);
                 }
 
                 if (needContinue) {// 读取下一个
@@ -145,6 +150,7 @@ public class LocalBinLogConnection implements ErosaConnection {
 
         FileLogFetcher fetcher = new FileLogFetcher(bufferSize);
         LogDecoder decoder = new LogDecoder();
+        decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT);
         decoder.handle(LogEvent.QUERY_EVENT);
         decoder.handle(LogEvent.XID_EVENT);
         LogContext context = new LogContext();

+ 66 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -1,10 +1,14 @@
 package com.alibaba.otter.canal.parse.inbound.mysql;
 
+import java.io.IOException;
+
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
+import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
+import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
 import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
@@ -21,7 +25,9 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
     // 数据库信息
     private AuthenticationInfo masterInfo;
     private EntryPosition      masterPosition;        // binlog信息
-
+    private MysqlConnection    metaConnection;        // 查询meta信息的链接
+    private TableMetaCache     tableMetaCache;        // 对应meta
+    
     private String             directory;
     private boolean            needWait   = false;
     private int                bufferSize = 16 * 1024;
@@ -33,17 +39,60 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
     @Override
     protected ErosaConnection buildErosaConnection() {
         return buildLocalBinLogConnection();
-    }
+    }   
+    
+    @Override
+	protected void preDump(ErosaConnection connection) {
+    	metaConnection = buildMysqlConnection();
+        try {
+            metaConnection.connect();
+        } catch (IOException e) {
+            throw new CanalParseException(e);
+        }
+        
+        tableMetaCache = new TableMetaCache(metaConnection);
+        ((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
+	}
+
+	@Override
+	protected void afterDump(ErosaConnection connection) {
+		if (metaConnection != null) {
+            try {
+                metaConnection.disconnect();
+            } catch (IOException e) {
+                logger.error("ERROR # disconnect meta connection for address:{}", metaConnection.getConnector()
+                    .getAddress(), e);
+            }
+        }
+	}
 
-    public void start() throws CanalParseException {
+	public void start() throws CanalParseException {
         if (runningInfo == null) { // 第一次链接主库
             runningInfo = masterInfo;
         }
 
         super.start();
     }
+	
+    @Override
+	public void stop() {
+    	if (metaConnection != null) {
+            try {
+                metaConnection.disconnect();
+            } catch (IOException e) {
+                logger.error("ERROR # disconnect meta connection for address:{}", metaConnection.getConnector()
+                    .getAddress(), e);
+            }
+        }
+
+        if (tableMetaCache != null) {
+            tableMetaCache.clearTableMeta();
+        }
 
-    private ErosaConnection buildLocalBinLogConnection() {
+        super.stop();
+	}
+
+	private ErosaConnection buildLocalBinLogConnection() {
         LocalBinLogConnection connection = new LocalBinLogConnection();
 
         connection.setBufferSize(this.bufferSize);
@@ -52,6 +101,19 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
 
         return connection;
     }
+
+    private MysqlConnection buildMysqlConnection() {
+        MysqlConnection connection = new MysqlConnection(runningInfo.getAddress(),
+            runningInfo.getUsername(),
+            runningInfo.getPassword(),
+            connectionCharsetNumber,
+            runningInfo.getDefaultDatabaseName());
+        connection.getConnector().setReceiveBufferSize(64 * 1024);
+        connection.getConnector().setSendBufferSize(64 * 1024);
+        connection.getConnector().setSoTimeout(30 * 1000);
+        connection.setCharset(connectionCharset);
+        return connection;
+    }
     
     @Override
     protected EntryPosition findStartPosition(ErosaConnection connection) {

+ 5 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.java

@@ -9,6 +9,8 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.IOFileFilter;
@@ -188,7 +190,9 @@ public class BinLogFileQueue {
         files.addAll(FileUtils.listFiles(directory, new IOFileFilter() {
 
             public boolean accept(File file) {
-                return file.getName().startsWith(baseName);
+				Pattern pattern = Pattern.compile("\\d+$");
+				Matcher matcher = pattern.matcher(file.getName());
+				return file.getName().startsWith(baseName) && matcher.find();
             }
 
             public boolean accept(File dir, String name) {