瀏覽代碼

Merge pull request #11 from alibaba/master

merge
rewerma 6 年之前
父節點
當前提交
75f380252e

+ 3 - 3
README.md

@@ -82,8 +82,8 @@ See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文
 
 1. canal整体交互协议设计上使用了protobuf3.0,理论上可以支持绝大部分的多语言场景,欢迎大家提交多客户端的PR
     * canal java客户端: <a href="https://github.com/alibaba/canal/wiki/ClientExample"> https://github.com/alibaba/canal/wiki/ClientExample </a>
-    * canal c#客户端开源项目地址:<a href="https://github.com/CanalSharp/CanalSharp"> https://github.com/CanalSharp/CanalSharp </a>
-    * canal go客户端,开发进行中
+    * canal c#客户端开源项目地址:<a href="https://github.com/CanalClient/CanalSharp"> https://github.com/CanalSharp/CanalSharp </a>
+    * canal go客户端开源项目地址:<a href="https://github.com/CanalClient/canal-go"> https://github.com/CanalClient/canal-go </a>
 2. canal作为MySQL binlog的增量获取工具,可以将数据投递到MQ系统中,比如Kafka/RocketMQ,可以借助于MQ的多语言能力 
 
 <h1>相关资料</h1>
@@ -116,7 +116,7 @@ See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文
 <h3>最新更新</h3>
 <ol>
 <li>canal发布重大版本更新1.1.0,具体releaseNode参考:<a href="https://github.com/alibaba/canal/releases/tag/canal-1.1.0">https://github.com/alibaba/canal/releases/tag/canal-1.1.0</a></li>
-<li>canal c#客户端开源项目地址:<a href="https://github.com/CanalSharp/CanalSharp"> https://github.com/CanalSharp/CanalSharp </a>,推荐! </li>
+<li>canal c#客户端开源项目地址:<a href="https://github.com/CanalClient/CanalSharp"> https://github.com/CanalClient/CanalSharp </a>,推荐! </li>
 <li>canal QQ讨论群已经建立,群号:161559791 ,欢迎加入进行技术讨论。</li>
 <li>canal消费端项目开源: Otter(分布式数据库同步系统),地址:<a href="https://github.com/alibaba/otter">https://github.com/alibaba/otter</a></li>
 

+ 14 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java

@@ -17,6 +17,9 @@ public class Dml implements Serializable {
     private String                    database;
     private String                    table;
     private String                    type;
+    // binlog executeTime
+    private Long                      es;
+    // dml build timeStamp
     private Long                      ts;
     private String                    sql;
     private List<Map<String, Object>> data;
@@ -78,11 +81,20 @@ public class Dml implements Serializable {
         this.old = old;
     }
 
+    public Long getEs() {
+        return es;
+    }
+
+    public void setEs(Long es) {
+        this.es = es;
+    }
+
     public void clear() {
         database = null;
         table = null;
         type = null;
         ts = null;
+        es = null;
         data = null;
         old = null;
         sql = null;
@@ -90,7 +102,7 @@ public class Dml implements Serializable {
 
     @Override
     public String toString() {
-        return "Dml{" + "database='" + database + '\'' + ", table='" + table + '\'' + ", type='" + type + '\''
-               + ", ts=" + ts + ", sql='" + sql + '\'' + ", data=" + data + ", old=" + old + '}';
+        return "Dml [database=" + database + ", table=" + table + ", type=" + type + ", es=" + es + ", ts=" + ts
+               + ", sql=" + sql + ", data=" + data + ", old=" + old + "]";
     }
 }

+ 2 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -45,6 +45,7 @@ public class MessageUtil {
             dml.setDatabase(entry.getHeader().getSchemaName());
             dml.setTable(entry.getHeader().getTableName());
             dml.setType(eventType.toString());
+            dml.setEs(entry.getHeader().getExecuteTime());
             dml.setTs(System.currentTimeMillis());
             dml.setSql(rowChange.getSql());
             List<Map<String, Object>> data = new ArrayList<>();
@@ -118,6 +119,7 @@ public class MessageUtil {
         dml.setTable(flatMessage.getTable());
         dml.setType(flatMessage.getType());
         dml.setTs(flatMessage.getTs());
+        dml.setEs(flatMessage.getEs());
         dml.setSql(flatMessage.getSql());
         if (flatMessage.getSqlType() == null || flatMessage.getMysqlType() == null) {
             throw new RuntimeException("SqlType or mysqlType is null");

+ 3 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -189,6 +189,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.UPDATE_ROWS_EVENT_V1: {
@@ -196,6 +197,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.DELETE_ROWS_EVENT_V1: {
@@ -203,6 +205,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.ROTATE_EVENT: {

+ 45 - 41
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java

@@ -8,6 +8,15 @@ import java.sql.Statement;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
+
 public class DirectLogFetcherTest extends BaseLogFetcherTest {
 
     @Test
@@ -15,54 +24,49 @@ public class DirectLogFetcherTest extends BaseLogFetcherTest {
         DirectLogFetcher fecther = new DirectLogFetcher();
         try {
             Class.forName("com.mysql.jdbc.Driver");
-            Connection connection = DriverManager.getConnection("jdbc:mysql://100.81.154.142:3306", "root", "hello");
+            Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "hello");
             Statement statement = connection.createStatement();
             statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
             statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
 
-            fecther.open(connection, "mysql-bin.000006", 120L, 2);
+            fecther.open(connection, "mysql-bin.000007", 89797036L, 2);
 
-            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.UNKNOWN_EVENT);
+            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
             while (fecther.fetch()) {
-                decoder.decode(fecther, context);
-                continue;
-                // if (event == null) {
-                // continue;
-                // }
-                //
-                // int eventType = event.getHeader().getType();
-                // switch (eventType) {
-                // case LogEvent.ROTATE_EVENT:
-                // binlogFileName = ((RotateLogEvent) event).getFilename();
-                // break;
-                // case LogEvent.WRITE_ROWS_EVENT_V1:
-                // case LogEvent.WRITE_ROWS_EVENT:
-                // parseRowsEvent((WriteRowsLogEvent) event);
-                // break;
-                // case LogEvent.UPDATE_ROWS_EVENT_V1:
-                // case LogEvent.UPDATE_ROWS_EVENT:
-                // parseRowsEvent((UpdateRowsLogEvent) event);
-                // break;
-                // case LogEvent.DELETE_ROWS_EVENT_V1:
-                // case LogEvent.DELETE_ROWS_EVENT:
-                // parseRowsEvent((DeleteRowsLogEvent) event);
-                // break;
-                // case LogEvent.QUERY_EVENT:
-                // parseQueryEvent((QueryLogEvent) event);
-                // break;
-                // case LogEvent.ROWS_QUERY_LOG_EVENT:
-                // parseRowsQueryEvent((RowsQueryLogEvent) event);
-                // break;
-                // case LogEvent.ANNOTATE_ROWS_EVENT:
-                // parseAnnotateRowsEvent((AnnotateRowsEvent) event);
-                // break;
-                // case LogEvent.XID_EVENT:
-                // parseXidEvent((XidLogEvent) event);
-                // break;
-                // default:
-                // break;
-                // }
+                LogEvent event = decoder.decode(fecther, context);
+                int eventType = event.getHeader().getType();
+                switch (eventType) {
+                    case LogEvent.ROTATE_EVENT:
+                        binlogFileName = ((RotateLogEvent) event).getFilename();
+                        break;
+                    case LogEvent.WRITE_ROWS_EVENT_V1:
+                    case LogEvent.WRITE_ROWS_EVENT:
+                        parseRowsEvent((WriteRowsLogEvent) event);
+                        break;
+                    case LogEvent.UPDATE_ROWS_EVENT_V1:
+                    case LogEvent.UPDATE_ROWS_EVENT:
+                        parseRowsEvent((UpdateRowsLogEvent) event);
+                        break;
+                    case LogEvent.DELETE_ROWS_EVENT_V1:
+                    case LogEvent.DELETE_ROWS_EVENT:
+                        parseRowsEvent((DeleteRowsLogEvent) event);
+                        break;
+                    case LogEvent.QUERY_EVENT:
+                        parseQueryEvent((QueryLogEvent) event);
+                        break;
+                    case LogEvent.ROWS_QUERY_LOG_EVENT:
+                        parseRowsQueryEvent((RowsQueryLogEvent) event);
+                        break;
+                    case LogEvent.ANNOTATE_ROWS_EVENT:
+                        parseAnnotateRowsEvent((AnnotateRowsEvent) event);
+                        break;
+                    case LogEvent.XID_EVENT:
+                        parseXidEvent((XidLogEvent) event);
+                        break;
+                    default:
+                        break;
+                }
             }
         } catch (Exception e) {
             e.printStackTrace();

+ 6 - 2
deployer/src/main/resources/example/instance.properties

@@ -31,8 +31,12 @@ canal.instance.tsdb.enable=true
 
 # username/password
 canal.instance.dbUsername=canal
-canal.instance.dbPassword=canal
-canal.instance.connectionCharset=UTF-8
+canal.instance.dbPassword=cZozNf1mzW6EQLGO2q9u99619xbZLO0fbua3EX08r4BWNXb8lAt1aHrTEOBttd6UY8Vnuc0easlVXZDdLtt8BQ==
+canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
+canal.instance.connectionCharset = UTF-8
+canal.instance.defaultDatabaseName =test
+# enable druid Decrypt database password
+canal.instance.enableDruid=true
 
 # table regex
 canal.instance.filter.regex=.*\\..*

+ 6 - 2
deployer/src/main/resources/spring/default-instance.xml

@@ -126,18 +126,22 @@
 		
 		<!-- 解析数据库信息 -->
 		<property name="masterInfo">
-			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
 				<property name="address" value="${canal.instance.master.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
+				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
-			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
 				<property name="address" value="${canal.instance.standby.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
+				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>

+ 7 - 3
deployer/src/main/resources/spring/file-instance.xml

@@ -111,22 +111,26 @@
 		
 		<!-- 解析数据库信息 -->
 		<property name="masterInfo">
-			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
 				<property name="address" value="${canal.instance.master.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
+				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
-			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
 				<property name="address" value="${canal.instance.standby.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
+				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>
-		
+
 		<!-- 解析起始位点 -->
 		<property name="masterPosition">
 			<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">

+ 12 - 4
deployer/src/main/resources/spring/group-instance.xml

@@ -108,18 +108,22 @@
 		
 		<!-- 解析数据库信息 -->
 		<property name="masterInfo">
-			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
 				<property name="address" value="${canal.instance.master1.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
+				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
-			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
 				<property name="address" value="${canal.instance.standby1.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
+				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>
@@ -206,18 +210,22 @@
 		
 		<!-- 解析数据库信息 -->
 		<property name="masterInfo">
-			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
 				<property name="address" value="${canal.instance.master2.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
+				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
-			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
 				<property name="address" value="${canal.instance.standby2.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
+				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>

+ 7 - 3
deployer/src/main/resources/spring/memory-instance.xml

@@ -96,21 +96,25 @@
 		
 		<!-- failover切换时回退的时间 -->
 		<property name="fallbackIntervalInSeconds" value="${canal.instance.fallbackIntervalInSeconds:60}" />
-		
+
 		<!-- 解析数据库信息 -->
 		<property name="masterInfo">
-			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
 				<property name="address" value="${canal.instance.master.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
+				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>
 		<property name="standbyInfo">
-			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
 				<property name="address" value="${canal.instance.standby.address}" />
 				<property name="username" value="${canal.instance.dbUsername:retl}" />
 				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
+				<property name="enableDruid" value="${canal.instance.enableDruid:false}" />
 				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
 			</bean>
 		</property>

+ 3 - 1
docker/image/admin/app.sh

@@ -87,7 +87,9 @@ function start_canal() {
         exit 1;
     else
         if [ "$destination" != "" ] && [ "$destination" != "example" ] ; then
-            mv /home/admin/canal-server/conf/example /home/admin/canal-server/conf/$destination
+            if [ -d /home/admin/canal-server/conf/example ]; then
+                mv /home/admin/canal-server/conf/example /home/admin/canal-server/conf/$destination
+            fi
         fi 
     fi
     su admin -c 'cd /home/admin/canal-server/bin/ && sh restart.sh 1>>/tmp/start.log 2>&1'

+ 4 - 1
instance/spring/src/test/resources/retl/instance.properties

@@ -27,9 +27,12 @@ canal.instance.master2.timestamp =
 
 # username/password
 canal.instance.dbUsername = xxxxx
-canal.instance.dbPassword = xxxxx
+canal.instance.dbPassword=cZozNf1mzW6EQLGO2q9u99619xbZLO0fbua3EX08r4BWNXb8lAt1aHrTEOBttd6UY8Vnuc0easlVXZDdLtt8BQ==
+canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
 canal.instance.defaultDatabaseName =
 canal.instance.connectionCharset = UTF-8
+# enable druid Decrypt database password
+canal.instance.enableDruid=true
 
 # table regex
 canal.instance.filter.regex = .*\\..*

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -317,10 +317,10 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                     eventSink.interrupt();
                     transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
                     binlogParser.reset();// 重新置位
-                    if (multiStageCoprocessor != null) {
+                    if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
                         // 处理 RejectedExecutionException
                         try {
-                            multiStageCoprocessor.reset();
+                            multiStageCoprocessor.stop();
                         } catch (Throwable t) {
                             logger.debug("multi processor rejected:", t);
                         }

+ 0 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/MultiStageCoprocessor.java

@@ -25,6 +25,4 @@ public interface MultiStageCoprocessor extends CanalLifeCycle {
     public boolean publish(LogBuffer buffer);
 
     public boolean publish(LogEvent event);
-
-    public void reset();
 }

+ 23 - 25
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -37,7 +37,6 @@ import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
 
-
 /**
  * 针对解析器提供一个多阶段协同的处理
  * 
@@ -53,21 +52,21 @@ import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
  */
 public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implements MultiStageCoprocessor {
 
-    private static final int             maxFullTimes = 10;
-    private LogEventConvert              logEventConvert;
-    private EventTransactionBuffer       transactionBuffer;
-    private ErosaConnection              connection;
-
-    private int                          parserThreadCount;
-    private int                          ringBufferSize;
-    private RingBuffer<MessageEvent>     disruptorMsgBuffer;
-    private ExecutorService              parserExecutor;
-    private ExecutorService              stageExecutor;
-    private String                       destination;
-    private volatile CanalParseException exception;
-    private AtomicLong                   eventsPublishBlockingTime;
-    private GTIDSet                      gtidSet;
-    private WorkerPool<MessageEvent>     workerPool;
+    private static final int                  maxFullTimes = 10;
+    private LogEventConvert                   logEventConvert;
+    private EventTransactionBuffer            transactionBuffer;
+    private ErosaConnection                   connection;
+
+    private int                               parserThreadCount;
+    private int                               ringBufferSize;
+    private RingBuffer<MessageEvent>          disruptorMsgBuffer;
+    private ExecutorService                   parserExecutor;
+    private ExecutorService                   stageExecutor;
+    private String                            destination;
+    private volatile CanalParseException      exception;
+    private AtomicLong                        eventsPublishBlockingTime;
+    private GTIDSet                           gtidSet;
+    private WorkerPool<MessageEvent>          workerPool;
     private BatchEventProcessor<MessageEvent> simpleParserStage;
     private BatchEventProcessor<MessageEvent> sinkStoreStage;
 
@@ -138,6 +137,10 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         try {
             parserExecutor.shutdownNow();
             while (!parserExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+                if (parserExecutor.isShutdown() || parserExecutor.isTerminated()) {
+                    break;
+                }
+
                 parserExecutor.shutdownNow();
             }
         } catch (Throwable e) {
@@ -147,6 +150,10 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         try {
             stageExecutor.shutdownNow();
             while (!stageExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+                if (stageExecutor.isShutdown() || stageExecutor.isTerminated()) {
+                    break;
+                }
+
                 stageExecutor.shutdownNow();
             }
         } catch (Throwable e) {
@@ -227,15 +234,6 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
     }
 
-    @Override
-    public void reset() {
-        if (isStart()) {
-            stop();
-        }
-
-        start();
-    }
-
     private class SimpleParserStage implements EventHandler<MessageEvent>, LifecycleAware {
 
         private LogDecoder decoder;

+ 29 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/support/AuthenticationInfo.java

@@ -2,21 +2,32 @@ package com.alibaba.otter.canal.parse.support;
 
 import java.net.InetSocketAddress;
 
+import com.alibaba.druid.filter.config.ConfigTools;
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 
 /**
  * 数据库认证信息
- * 
+ *
  * @author jianghang 2012-7-11 上午11:22:19
  * @version 1.0.0
  */
 public class AuthenticationInfo {
 
+
+
     private InetSocketAddress address;            // 主库信息
     private String            username;           // 帐号
     private String            password;           // 密码
-    private String            defaultDatabaseName; // 默认链接的数据库
+    private String            defaultDatabaseName;// 默认链接的数据库
+    private String            pwdPublicKey;       //公钥
+    private boolean           enableDruid;        //是否使用druid加密解密数据库密码
+
+    public void initPwd() throws Exception{
+        if (enableDruid) {
+            this.password = ConfigTools.decrypt(pwdPublicKey, password);
+        }
+    }
 
     public AuthenticationInfo(){
         super();
@@ -65,6 +76,22 @@ public class AuthenticationInfo {
         this.defaultDatabaseName = defaultDatabaseName;
     }
 
+    public String getPwdPublicKey() {
+        return pwdPublicKey;
+    }
+
+    public void setPwdPublicKey(String pwdPublicKey) {
+        this.pwdPublicKey = pwdPublicKey;
+    }
+
+    public boolean isEnableDruid() {
+        return enableDruid;
+    }
+
+    public void setEnableDruid(boolean enableDruid) {
+        this.enableDruid = enableDruid;
+    }
+
     @Override
     public String toString() {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE);

+ 1 - 1
pom.xml

@@ -247,7 +247,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_540</version>
+                <version>2.0.0_preview_617</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>

+ 25 - 7
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -1,7 +1,12 @@
 package com.alibaba.otter.canal.protocol;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import com.google.protobuf.ByteString;
 
@@ -18,6 +23,9 @@ public class FlatMessage implements Serializable {
     private String                    table;
     private Boolean                   isDdl;
     private String                    type;
+    // binlog executeTime
+    private Long                      es;
+    // dml build timeStamp
     private Long                      ts;
     private String                    sql;
     private Map<String, Integer>      sqlType;
@@ -120,6 +128,14 @@ public class FlatMessage implements Serializable {
         this.old = old;
     }
 
+    public Long getEs() {
+        return es;
+    }
+
+    public void setEs(Long es) {
+        this.es = es;
+    }
+
     /**
      * 将Message转换为FlatMessage
      * 
@@ -147,9 +163,8 @@ public class FlatMessage implements Serializable {
                 try {
                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                 } catch (Exception e) {
-                    throw new RuntimeException(
-                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
-                        e);
+                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+                                               + entry.toString(), e);
                 }
 
                 CanalEntry.EventType eventType = rowChange.getEventType();
@@ -160,6 +175,7 @@ public class FlatMessage implements Serializable {
                 flatMessage.setTable(entry.getHeader().getTableName());
                 flatMessage.setIsDdl(rowChange.getIsDdl());
                 flatMessage.setType(eventType.toString());
+                flatMessage.setEs(entry.getHeader().getExecuteTime());
                 flatMessage.setTs(System.currentTimeMillis());
                 flatMessage.setSql(rowChange.getSql());
 
@@ -273,6 +289,8 @@ public class FlatMessage implements Serializable {
                         flatMessageTmp.setSql(flatMessage.getSql());
                         flatMessageTmp.setSqlType(flatMessage.getSqlType());
                         flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
+                        flatMessageTmp.setEs(flatMessage.getEs());
+                        flatMessageTmp.setTs(flatMessage.getTs());
                     }
                     List<Map<String, String>> data = flatMessageTmp.getData();
                     if (data == null) {
@@ -297,8 +315,8 @@ public class FlatMessage implements Serializable {
 
     @Override
     public String toString() {
-        return "FlatMessage{" + "id=" + id + ", database='" + database + '\'' + ", table='" + table + '\'' + ", isDdl="
-               + isDdl + ", type='" + type + '\'' + ", ts=" + ts + ", sql='" + sql + '\'' + ", sqlType=" + sqlType
-               + ", mysqlType=" + mysqlType + ", data=" + data + ", old=" + old + '}';
+        return "FlatMessage [id=" + id + ", database=" + database + ", table=" + table + ", isDdl=" + isDdl + ", type="
+               + type + ", es=" + es + ", ts=" + ts + ", sql=" + sql + ", sqlType=" + sqlType + ", mysqlType="
+               + mysqlType + ", data=" + data + ", old=" + old + "]";
     }
 }

+ 9 - 6
store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

@@ -7,6 +7,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.commons.lang.StringUtils;
+
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
@@ -37,14 +39,14 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
     private static final long INIT_SEQUENCE = -1;
     private int               bufferSize    = 16 * 1024;
-    private int               bufferMemUnit = 1024;                         // memsize的单位,默认为1kb大小
+    private int               bufferMemUnit = 1024;                                      // memsize的单位,默认为1kb大小
     private int               indexMask;
     private Event[]           entries;
 
     // 记录下put/get/ack操作的三个下标
-    private AtomicLong        putSequence   = new AtomicLong(INIT_SEQUENCE); // 代表当前put操作最后一次写操作发生的位置
-    private AtomicLong        getSequence   = new AtomicLong(INIT_SEQUENCE); // 代表当前get操作读取的最后一条的位置
-    private AtomicLong        ackSequence   = new AtomicLong(INIT_SEQUENCE); // 代表当前ack操作的最后一条的位置
+    private AtomicLong        putSequence   = new AtomicLong(INIT_SEQUENCE);             // 代表当前put操作最后一次写操作发生的位置
+    private AtomicLong        getSequence   = new AtomicLong(INIT_SEQUENCE);             // 代表当前get操作读取的最后一条的位置
+    private AtomicLong        ackSequence   = new AtomicLong(INIT_SEQUENCE);             // 代表当前ack操作的最后一条的位置
 
     // 记录下put/get/ack操作的三个memsize大小
     private AtomicLong        putMemSize    = new AtomicLong(0);
@@ -66,7 +68,7 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     private Condition         notFull       = lock.newCondition();
     private Condition         notEmpty      = lock.newCondition();
 
-    private BatchMode         batchMode     = BatchMode.ITEMSIZE;           // 默认为内存大小模式
+    private BatchMode         batchMode     = BatchMode.ITEMSIZE;                        // 默认为内存大小模式
     private boolean           ddlIsolation  = false;
 
     public MemoryEventStoreWithBuffer(){
@@ -335,7 +337,8 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
         for (int i = entrys.size() - 1; i >= 0; i--) {
             Event event = entrys.get(i);
-            if (CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntryType()
+            // GTID模式,ack的位点必须是事务结尾,因为下一次订阅的时候mysql会发送这个gtid之后的next,如果在事务头就记录了会丢这最后一个事务
+            if ((CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntryType() && StringUtils.isEmpty(event.getGtid()))
                 || CanalEntry.EntryType.TRANSACTIONEND == event.getEntryType() || isDdl(event.getEventType())) {
                 // 将事务头/尾设置可被为ack的点
                 range.setAck(CanalEventUtils.createPosition(event));