Bladeren bron

Merge pull request #9 from alibaba/master

merge
rewerma 6 jaren geleden
bovenliggende
commit
fa8ffb8e66

+ 43 - 17
README.md

@@ -1,14 +1,6 @@
 <div class="blog_content">
     <div class="iteye-blog-content-contain">
-
-<h3>最新更新</h3>
-<ol>
-<li>canal QQ讨论群已经建立,群号:161559791 ,欢迎加入进行技术讨论。</li>
-<li>canal消费端项目开源: Otter(分布式数据库同步系统),地址:<a href="https://github.com/alibaba/otter">https://github.com/alibaba/otter</a></li>
-<li>Canal已在阿里云推出商业化版本 <a href="https://www.aliyun.com/product/dts?spm=a2c4g.11186623.cloudEssentials.80.srdwr7">数据传输服务DTS</a>, 开通即用,免去部署维护的昂贵使用成本。DTS针对阿里云RDS、DRDS等产品进行了适配,解决了Binlog日志回收,主备切换、VPC网络切换等场景下的订阅高可用问题。同时,针对RDS进行了针对性的性能优化。出于稳定性、性能及成本的考虑,强烈推荐阿里云用户使用DTS产品。<a href="https://help.aliyun.com/document_detail/26592.html?spm=a2c4g.11174283.6.539.t1Y91E">DTS产品使用文档</a></li>
-DTS支持阿里云RDS&DRDS的Binlog日志实时订阅,现推出首月免费体验,限时限量,<a href="https://common-buy.aliyun.com/?commodityCode=dtspre&request=%7b%22dts_function%22%3a%22data_subscribe%22%7d">立即体验>>></a>
-</ol>
-
+    	
 <h1>背景</h1>
 <p style="font-size: 14px;">   早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&amp;消费的业务,从此开启了一段新纪元。</p>
 <p style="font-size: 14px;">   ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)</p>
@@ -46,6 +38,16 @@ DTS支持阿里云RDS&DRDS的Binlog日志实时订阅,现推出首月免费体
 <li>canal解析binary log对象(原始为byte流)</li>
 </ol>
 
+<h1>重要版本更新说明</h1>
+
+canal 1.1.x系列,参考release文档:<a href="https://github.com/alibaba/canal/releases">版本发布信息</a>
+
+1. 整体性能测试&优化,提升了150%. #726 参考: 【[Performance](https://github.com/alibaba/canal/wiki/Performance)】
+2. 原生支持prometheus监控 #765 【[Prometheus QuickStart](https://github.com/alibaba/canal/wiki/Prometheus-QuickStart)】
+3. 原生支持kafka消息投递 #695 【[Canal Kafka QuickStart](https://github.com/alibaba/canal/wiki/Canal-Kafka-QuickStart)】
+4. 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: 【[Aliyun RDS QuickStart](https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart)】
+5. 原生支持docker镜像 #801 参考:  【[Docker QuickStart](https://github.com/alibaba/canal/wiki/Docker-QuickStart)】
+
 <h1>相关文档</h1>
 
 See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文档</a>
@@ -54,16 +56,36 @@ See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文
 <ul>
 <li><a class="internal present" href="https://github.com/alibaba/canal/wiki/Home">Home</a></li>
 <li><a class="internal present" href="https://github.com/alibaba/canal/wiki/Introduction">Introduction</a></li>
-<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/QuickStart">QuickStart</a></li>
-<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/ClientExample">ClientExample</a></li>
+<li>
+<a class="internal present" href="https://github.com/alibaba/canal/wiki/QuickStart">QuickStart</a>
+<ul>
+<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/Docker-QuickStart">Docker QuickStart</a></li>
+<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/Canal-Kafka-QuickStart">Canal Kafka QuickStart</a></li>
+<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart">Aliyun RDS QuickStart</a></li>
+<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/Prometheus-QuickStart">Prometheus QuickStart</a></li>
+</ul>
+</li>
 <li><a class="internal present" href="https://github.com/alibaba/canal/wiki/AdminGuide">AdminGuide</a></li>
+<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/ClientExample">ClientExample</a></li>
 <li><a class="internal present" href="https://github.com/alibaba/canal/wiki/ClientAPI">ClientAPI</a></li>
+<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/Performance">Performance</a></li>
 <li><a class="internal present" href="https://github.com/alibaba/canal/wiki/DevGuide">DevGuide</a></li>
 <li><a class="internal present" href="https://github.com/alibaba/canal/wiki/BinlogChange%28mysql5.6%29">BinlogChange(Mysql5.6)</a></li>
+<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/BinlogChange%28MariaDB%29">BinlogChange(MariaDB)</a></li>
+<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/TableMetaTSDB">TableMetaTSDB</a></li>
 <li><a href="http://alibaba.github.com/canal/release.html">ReleaseNotes</a></li>
 <li><a href="https://github.com/alibaba/canal/releases">Download</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/FAQ">FAQ</a></li>
 </ul>
 
+<h1>多语言业务</h1>
+
+1. canal整体交互协议设计上使用了protobuf3.0,理论上可以支持绝大部分的多语言场景,欢迎大家提交多客户端的PR
+    * canal java客户端: <a href="https://github.com/alibaba/canal/wiki/ClientExample"> https://github.com/alibaba/canal/wiki/ClientExample </a>
+    * canal c#客户端开源项目地址:<a href="https://github.com/CanalSharp/CanalSharp"> https://github.com/CanalSharp/CanalSharp </a>
+    * canal go客户端,开发进行中
+2. canal作为MySQL binlog的增量获取工具,可以将数据投递到MQ系统中,比如Kafka/RocketMQ,可以借助于MQ的多语言能力 
+
 <h1>相关资料</h1>
 
 * ADC阿里技术嘉年华分享ppt (放在google docs上,可能需要翻墙): <a href="https://docs.google.com/presentation/d/1MkszUPYRDkfVPz9IqOT1LLT5d9tuwde_WC8GZvjaDRg/edit?usp=sharing">ppt下载</href>  
@@ -91,9 +113,13 @@ See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文
 <li>报告issue:<a href="https://github.com/alibaba/canal/issues">issues</a></li>
 </ol>
 
-<pre>
-【招聘】阿里巴巴中间件团队招聘JAVA高级工程师
-岗位主要为技术型内容(非业务部门),阿里中间件整个体系对于未来想在技术上有所沉淀的同学还是非常有帮助的
-工作地点:杭州、北京均可. ps. 阿里待遇向来都是不错的,有意者可以QQ、微博私聊. 
-具体招聘内容:https://job.alibaba.com/zhaopin/position_detail.htm?positionId=32666
-</pre>
+<h3>最新更新</h3>
+<ol>
+<li>canal发布重大版本更新1.1.0,具体releaseNode参考:<a href="https://github.com/alibaba/canal/releases/tag/canal-1.1.0">https://github.com/alibaba/canal/releases/tag/canal-1.1.0</a></li>
+<li>canal c#客户端开源项目地址:<a href="https://github.com/CanalSharp/CanalSharp"> https://github.com/CanalSharp/CanalSharp </a>,推荐! </li>
+<li>canal QQ讨论群已经建立,群号:161559791 ,欢迎加入进行技术讨论。</li>
+<li>canal消费端项目开源: Otter(分布式数据库同步系统),地址:<a href="https://github.com/alibaba/otter">https://github.com/alibaba/otter</a></li>
+
+<li>Canal已在阿里云推出商业化版本 <a href="https://www.aliyun.com/product/dts?spm=a2c4g.11186623.cloudEssentials.80.srdwr7">数据传输服务DTS</a>, 开通即用,免去部署维护的昂贵使用成本。DTS针对阿里云RDS、DRDS等产品进行了适配,解决了Binlog日志回收,主备切换、VPC网络切换等场景下的订阅高可用问题。同时,针对RDS进行了针对性的性能优化。出于稳定性、性能及成本的考虑,强烈推荐阿里云用户使用DTS产品。<a href="https://help.aliyun.com/document_detail/26592.html?spm=a2c4g.11174283.6.539.t1Y91E">DTS产品使用文档</a></li>
+DTS支持阿里云RDS&DRDS的Binlog日志实时订阅,现推出首月免费体验,限时限量,<a href="https://common-buy.aliyun.com/?commodityCode=dtspre&request=%7b%22dts_function%22%3a%22data_subscribe%22%7d">立即体验>>></a>
+</ol>

+ 4 - 4
deployer/src/main/resources/spring/tsdb/sql/create_table.sql

@@ -26,10 +26,10 @@ CREATE TABLE IF NOT EXISTS `meta_history` (
   `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id',
   `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog应用的时间戳',
   `use_schema` varchar(1024) DEFAULT NULL COMMENT '执行sql时对应的schema',
-  `schema` varchar(1024) DEFAULT NULL COMMENT '对应的schema',
-  `table` varchar(1024) DEFAULT NULL COMMENT '对应的table',
-  `sql` longtext DEFAULT NULL COMMENT '执行的sql',
-  `type` varchar(256) DEFAULT NULL COMMENT 'sql类型',
+  `sql_schema` varchar(1024) DEFAULT NULL COMMENT '对应的schema',
+  `sql_table` varchar(1024) DEFAULT NULL COMMENT '对应的table',
+  `sql_text` longtext DEFAULT NULL COMMENT '执行的sql',
+  `sql_type` varchar(256) DEFAULT NULL COMMENT 'sql类型',
   `extra` text DEFAULT NULL COMMENT '额外的扩展信息',
   PRIMARY KEY (`id`),
   UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),

+ 18 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
 
@@ -36,6 +37,7 @@ import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
 
+
 /**
  * 针对解析器提供一个多阶段协同的处理
  * 
@@ -65,6 +67,9 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     private volatile CanalParseException exception;
     private AtomicLong                   eventsPublishBlockingTime;
     private GTIDSet                      gtidSet;
+    private WorkerPool<MessageEvent>     workerPool;
+    private BatchEventProcessor<MessageEvent> simpleParserStage;
+    private BatchEventProcessor<MessageEvent> sinkStoreStage;
 
     public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
                                       EventTransactionBuffer transactionBuffer, String destination){
@@ -91,7 +96,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         SequenceBarrier sequenceBarrier = disruptorMsgBuffer.newBarrier();
         ExceptionHandler exceptionHandler = new SimpleFatalExceptionHandler();
         // stage 2
-        BatchEventProcessor<MessageEvent> simpleParserStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
+        simpleParserStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
             sequenceBarrier,
             new SimpleParserStage());
         simpleParserStage.setExceptionHandler(exceptionHandler);
@@ -103,7 +108,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         for (int i = 0; i < parserThreadCount; i++) {
             workHandlers[i] = new DmlParserStage();
         }
-        WorkerPool<MessageEvent> workerPool = new WorkerPool<MessageEvent>(disruptorMsgBuffer,
+        workerPool = new WorkerPool<MessageEvent>(disruptorMsgBuffer,
             dmlParserSequenceBarrier,
             exceptionHandler,
             workHandlers);
@@ -112,7 +117,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         // stage 4
         SequenceBarrier sinkSequenceBarrier = disruptorMsgBuffer.newBarrier(sequence);
-        BatchEventProcessor<MessageEvent> sinkStoreStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
+        sinkStoreStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
             sinkSequenceBarrier,
             new SinkStoreStage());
         sinkStoreStage.setExceptionHandler(exceptionHandler);
@@ -126,14 +131,24 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
     @Override
     public void stop() {
+        // fix bug #968,对于pool与
+        workerPool.halt();
+        simpleParserStage.halt();
+        sinkStoreStage.halt();
         try {
             parserExecutor.shutdownNow();
+            while (!parserExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+                parserExecutor.shutdownNow();
+            }
         } catch (Throwable e) {
             // ignore
         }
 
         try {
             stageExecutor.shutdownNow();
+            while (!stageExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+                stageExecutor.shutdownNow();
+            }
         } catch (Throwable e) {
             // ignore
         }