Browse Source

Merge pull request #213 from lulu2panpan/master

修复issue #212
agapple 8 years ago
parent
commit
4898829384

+ 163 - 0
deployer/src/main/resources/spring/local-instance.xml

@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
+	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:lang="http://www.springframework.org/schema/lang"
+	xmlns:context="http://www.springframework.org/schema/context"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
+           http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.0.xsd
+           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
+           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
+	default-autowire="byName">
+
+	<!-- properties -->
+	<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
+		<property name="ignoreResourceNotFound" value="true" />
+		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
+		<property name="locationNames">
+			<list>
+				<value>classpath:canal.properties</value>
+				<value>classpath:${canal.instance.destination:}/instance.properties</value>
+			</list>
+		</property>
+	</bean>
+	
+	<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
+	<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> 
+		<property name="propertyEditorRegistrars">
+			<list>
+				<ref bean="socketAddressEditor" />
+			</list>
+		</property>
+	</bean>
+	
+	<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
+		<property name="destination" value="${canal.instance.destination}" />
+		<property name="eventParser">
+			<ref local="eventParser" />
+		</property>
+		<property name="eventSink">
+			<ref local="eventSink" />
+		</property>
+		<property name="eventStore">
+			<ref local="eventStore" />
+		</property>
+		<property name="metaManager">
+			<ref local="metaManager" />
+		</property>
+		<property name="alarmHandler">
+			<ref local="alarmHandler" />
+		</property>
+		<property name="eventFetcher">
+			<ref local="eventFetcher"/>
+		</property>
+	</bean>
+	
+	<bean id="eventFetcher" class="com.alibaba.otter.canal.mq.fetch.CanalEventFetcherWithMq">
+		<property name="enable" value="${canal.instance.mq.enable}"/>
+		<property name="sendMode" value="${canal.instance.mq.sendMode}"/>
+	</bean>
+	
+	<!-- 报警处理类 -->
+	<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.MsgAlarmHandler">
+		<property name="enabled" value="${canal.alarm.enable}" />
+		<property name="environment" value="${canal.alarm.environment}" />
+		<property name="intercenterApi" value="${canal.alarm.intercenter.api}" />
+		<property name="administrators" value="${canal.alarm.administrators}" />
+	</bean>
+	
+	<bean id="zkClientx" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean" >
+		<property name="targetClass" value="com.alibaba.otter.canal.common.zookeeper.ZkClientx" />
+		<property name="targetMethod" value="getZkClient" />
+		<property name="arguments">
+			<list>
+				<value>${canal.zkServers:127.0.0.1:2181}</value>
+			</list>
+		</property>
+	</bean>
+	
+	<bean id="metaManager" class="com.alibaba.otter.canal.meta.PeriodMixedMetaManager">
+		<property name="zooKeeperMetaManager">
+			<bean class="com.alibaba.otter.canal.meta.ZooKeeperMetaManager">
+				<property name="zkClientx" ref="zkClientx" />
+			</bean>
+		</property>
+		<property name="period" value="${canal.zookeeper.flush.period:1000}" />
+	</bean>
+	
+	<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
+		<property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
+		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
+		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
+		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
+	</bean>
+	
+	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
+		<property name="eventStore" ref="eventStore" />
+	</bean>
+
+	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser">
+		<property name="destination" value="${canal.instance.destination}" />			
+		<property name="alarmHandler" ref="alarmHandler" />
+		
+		<!-- 解析过滤处理 -->
+		<property name="eventFilter">
+			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
+				<constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
+			</bean>
+		</property>
+		
+		<property name="eventBlackFilter">
+			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
+				<constructor-arg index="0" value="${canal.instance.filter.black.regex:}" />
+				<constructor-arg index="1" value="false" />
+			</bean>
+		</property>
+		
+		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
+		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
+			
+		<!-- 解析编码 -->
+		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
+	
+		<!-- 解析位点记录 -->
+		<property name="logPositionManager">
+			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
+				<property name="primary">
+					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
+				</property>
+				<property name="failback">
+					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
+						<property name="metaManager" ref="metaManager" />
+					</bean>
+				</property>
+			</bean>
+		</property>
+		
+		<!-- 解析数据库信息 -->
+		<property name="masterInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+				<property name="address" value="${canal.instance.master.address}" />
+				<property name="username" value="${canal.instance.dbUsername:retl}" />
+				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+			</bean>
+		</property>
+		
+		<!-- 解析起始位点 -->
+		<property name="masterPosition">
+			<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
+				<property name="journalName" value="${canal.instance.master.journal.name}" />
+				<property name="position" value="${canal.instance.master.position}" />
+				<property name="timestamp" value="${canal.instance.master.timestamp}" />
+			</bean>
+		</property>
+		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
+		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
+		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
+		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
+		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
+		<property name="needWait" value="${canal.instance.parser.needWait:true}"/>
+		<property name="directory" value="${canal.instance.parser.directory}"/>
+	</bean>
+</beans>

+ 19 - 13
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -8,7 +8,6 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.local.BinLogFileQueue;
 import com.alibaba.otter.canal.parse.inbound.mysql.local.BinLogFileQueue;
@@ -18,6 +17,7 @@ import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.LogPosition;
 import com.taobao.tddl.dbsync.binlog.LogPosition;
 import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
 
 
 /**
 /**
  * local bin log connection (not real connection)
  * local bin log connection (not real connection)
@@ -83,9 +83,9 @@ public class LocalBinLogConnection implements ErosaConnection {
             context.setLogPosition(new LogPosition(binlogfilename, binlogPosition));
             context.setLogPosition(new LogPosition(binlogfilename, binlogPosition));
             while (running) {
             while (running) {
                 boolean needContinue = true;
                 boolean needContinue = true;
-                LogEvent event;
-                while (fetcher.fetch()) {
-                    event = decoder.decode(fetcher, context);
+				LogEvent event = null;
+                L:while (fetcher.fetch()) {
+                    /*event = decoder.decode(fetcher, context);
                     if (event == null) {
                     if (event == null) {
                         throw new CanalParseException("parse failed");
                         throw new CanalParseException("parse failed");
                     }
                     }
@@ -93,15 +93,20 @@ public class LocalBinLogConnection implements ErosaConnection {
                     if (!func.sink(event)) {
                     if (!func.sink(event)) {
                         needContinue = false;
                         needContinue = false;
                         break;
                         break;
-                    }
-
-                    // do {
-                    // event = decoder.decode(fetcher, context);
-                    // if (event != null && !func.sink(event)) {
-                    // needContinue = false;
-                    // break;
-                    // }
-                    // } while (event != null);
+                    }*/
+
+					do {
+						if (event == null) {
+							event = new RotateLogEvent(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());
+						} else {
+							event = decoder.decode(fetcher, context);
+						}
+												
+						if (event != null && !func.sink(event)) {
+							needContinue = false;
+							break L;
+						}
+					} while (event != null);
                 }
                 }
 
 
                 if (needContinue) {// 读取下一个
                 if (needContinue) {// 读取下一个
@@ -145,6 +150,7 @@ public class LocalBinLogConnection implements ErosaConnection {
 
 
         FileLogFetcher fetcher = new FileLogFetcher(bufferSize);
         FileLogFetcher fetcher = new FileLogFetcher(bufferSize);
         LogDecoder decoder = new LogDecoder();
         LogDecoder decoder = new LogDecoder();
+        decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT);
         decoder.handle(LogEvent.QUERY_EVENT);
         decoder.handle(LogEvent.QUERY_EVENT);
         decoder.handle(LogEvent.XID_EVENT);
         decoder.handle(LogEvent.XID_EVENT);
         LogContext context = new LogContext();
         LogContext context = new LogContext();

+ 66 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -1,10 +1,14 @@
 package com.alibaba.otter.canal.parse.inbound.mysql;
 package com.alibaba.otter.canal.parse.inbound.mysql;
 
 
+import java.io.IOException;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 
 
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
+import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
+import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
 import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
@@ -21,7 +25,9 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
     // 数据库信息
     // 数据库信息
     private AuthenticationInfo masterInfo;
     private AuthenticationInfo masterInfo;
     private EntryPosition      masterPosition;        // binlog信息
     private EntryPosition      masterPosition;        // binlog信息
-
+    private MysqlConnection    metaConnection;        // 查询meta信息的链接
+    private TableMetaCache     tableMetaCache;        // 对应meta
+    
     private String             directory;
     private String             directory;
     private boolean            needWait   = false;
     private boolean            needWait   = false;
     private int                bufferSize = 16 * 1024;
     private int                bufferSize = 16 * 1024;
@@ -33,17 +39,60 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
     @Override
     @Override
     protected ErosaConnection buildErosaConnection() {
     protected ErosaConnection buildErosaConnection() {
         return buildLocalBinLogConnection();
         return buildLocalBinLogConnection();
-    }
+    }   
+    
+    @Override
+	protected void preDump(ErosaConnection connection) {
+    	metaConnection = buildMysqlConnection();
+        try {
+            metaConnection.connect();
+        } catch (IOException e) {
+            throw new CanalParseException(e);
+        }
+        
+        tableMetaCache = new TableMetaCache(metaConnection);
+        ((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
+	}
+
+	@Override
+	protected void afterDump(ErosaConnection connection) {
+		if (metaConnection != null) {
+            try {
+                metaConnection.disconnect();
+            } catch (IOException e) {
+                logger.error("ERROR # disconnect meta connection for address:{}", metaConnection.getConnector()
+                    .getAddress(), e);
+            }
+        }
+	}
 
 
-    public void start() throws CanalParseException {
+	public void start() throws CanalParseException {
         if (runningInfo == null) { // 第一次链接主库
         if (runningInfo == null) { // 第一次链接主库
             runningInfo = masterInfo;
             runningInfo = masterInfo;
         }
         }
 
 
         super.start();
         super.start();
     }
     }
+	
+    @Override
+	public void stop() {
+    	if (metaConnection != null) {
+            try {
+                metaConnection.disconnect();
+            } catch (IOException e) {
+                logger.error("ERROR # disconnect meta connection for address:{}", metaConnection.getConnector()
+                    .getAddress(), e);
+            }
+        }
+
+        if (tableMetaCache != null) {
+            tableMetaCache.clearTableMeta();
+        }
 
 
-    private ErosaConnection buildLocalBinLogConnection() {
+        super.stop();
+	}
+
+	private ErosaConnection buildLocalBinLogConnection() {
         LocalBinLogConnection connection = new LocalBinLogConnection();
         LocalBinLogConnection connection = new LocalBinLogConnection();
 
 
         connection.setBufferSize(this.bufferSize);
         connection.setBufferSize(this.bufferSize);
@@ -52,6 +101,19 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
 
 
         return connection;
         return connection;
     }
     }
+
+    private MysqlConnection buildMysqlConnection() {
+        MysqlConnection connection = new MysqlConnection(runningInfo.getAddress(),
+            runningInfo.getUsername(),
+            runningInfo.getPassword(),
+            connectionCharsetNumber,
+            runningInfo.getDefaultDatabaseName());
+        connection.getConnector().setReceiveBufferSize(64 * 1024);
+        connection.getConnector().setSendBufferSize(64 * 1024);
+        connection.getConnector().setSoTimeout(30 * 1000);
+        connection.setCharset(connectionCharset);
+        return connection;
+    }
     
     
     @Override
     @Override
     protected EntryPosition findStartPosition(ErosaConnection connection) {
     protected EntryPosition findStartPosition(ErosaConnection connection) {

+ 2 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -13,6 +13,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.CollectionUtils;
 
 
+import com.alibaba.otter.canal.common.utils.JsonUtils;
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.CanalHASwitchable;
 import com.alibaba.otter.canal.parse.CanalHASwitchable;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
@@ -412,7 +413,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                     }
                     }
                 }
                 }
                 // 其余情况
                 // 其余情况
-                logger.warn("prepare to find start position just last position");
+                logger.warn("prepare to find start position just last position\n {}",JsonUtils.marshalToString(logPosition));
                 return logPosition.getPostion();
                 return logPosition.getPostion();
             } else {
             } else {
                 // 针对切换的情况,考虑回退时间
                 // 针对切换的情况,考虑回退时间

+ 5 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.java

@@ -9,6 +9,8 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.TimerTask;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.IOFileFilter;
 import org.apache.commons.io.filefilter.IOFileFilter;
@@ -188,7 +190,9 @@ public class BinLogFileQueue {
         files.addAll(FileUtils.listFiles(directory, new IOFileFilter() {
         files.addAll(FileUtils.listFiles(directory, new IOFileFilter() {
 
 
             public boolean accept(File file) {
             public boolean accept(File file) {
-                return file.getName().startsWith(baseName);
+				Pattern pattern = Pattern.compile("\\d+$");
+				Matcher matcher = pattern.matcher(file.getName());
+				return file.getName().startsWith(baseName) && matcher.find();
             }
             }
 
 
             public boolean accept(File dir, String name) {
             public boolean accept(File dir, String name) {

+ 2 - 0
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -131,6 +131,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
      */
      */
     @Override
     @Override
     public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
     public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
+    	checkStart(clientIdentity.getDestination());
+    	
         CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
         CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
         if (!canalInstance.getMetaManager().isStart()) {
         if (!canalInstance.getMetaManager().isStart()) {
             canalInstance.getMetaManager().start();
             canalInstance.getMetaManager().start();

+ 3 - 2
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -66,8 +66,7 @@ public class SessionHandler extends SimpleChannelHandler {
                             Short.valueOf(sub.getClientId()),
                             Short.valueOf(sub.getClientId()),
                             sub.getFilter());
                             sub.getFilter());
                         MDC.put("destination", clientIdentity.getDestination());
                         MDC.put("destination", clientIdentity.getDestination());
-                        embeddedServer.subscribe(clientIdentity);
-
+                        
                         // 尝试启动,如果已经启动,忽略
                         // 尝试启动,如果已经启动,忽略
                         if (!embeddedServer.isStart(clientIdentity.getDestination())) {
                         if (!embeddedServer.isStart(clientIdentity.getDestination())) {
                             ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
                             ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
@@ -75,6 +74,8 @@ public class SessionHandler extends SimpleChannelHandler {
                                 runningMonitor.start();
                                 runningMonitor.start();
                             }
                             }
                         }
                         }
+                        
+                        embeddedServer.subscribe(clientIdentity);
 
 
                         ctx.setAttachment(clientIdentity);// 设置状态数据
                         ctx.setAttachment(clientIdentity);// 设置状态数据
                         NettyUtils.ack(ctx.getChannel(), null);
                         NettyUtils.ack(ctx.getChannel(), null);