Browse Source

Merge remote-tracking branch 'upstream/master'

winger 6 years ago
parent
commit
b04446d24d

+ 49 - 23
README.md

@@ -1,14 +1,6 @@
 <div class="blog_content">
     <div class="iteye-blog-content-contain">
-
-<h3>最新更新</h3>
-<ol>
-<li>canal QQ讨论群已经建立,群号:161559791 ,欢迎加入进行技术讨论。</li>
-<li>canal消费端项目开源: Otter(分布式数据库同步系统),地址:<a href="https://github.com/alibaba/otter">https://github.com/alibaba/otter</a></li>
-<li>Canal已在阿里云推出商业化版本 <a href="https://www.aliyun.com/product/dts?spm=a2c4g.11186623.cloudEssentials.80.srdwr7">数据传输服务DTS</a>, 开通即用,免去部署维护的昂贵使用成本。DTS针对阿里云RDS、DRDS等产品进行了适配,解决了Binlog日志回收,主备切换、VPC网络切换等场景下的订阅高可用问题。同时,针对RDS进行了针对性的性能优化。出于稳定性、性能及成本的考虑,强烈推荐阿里云用户使用DTS产品。<a href="https://help.aliyun.com/document_detail/26592.html?spm=a2c4g.11174283.6.539.t1Y91E">DTS产品使用文档</a></li>
-DTS支持阿里云RDS&DRDS的Binlog日志实时订阅,现推出首月免费体验,限时限量,<a href="https://common-buy.aliyun.com/?commodityCode=dtspre&request=%7b%22dts_function%22%3a%22data_subscribe%22%7d">立即体验>>></a>
-</ol>
-
+    	
 <h1>背景</h1>
 <p style="font-size: 14px;">   早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&amp;消费的业务,从此开启了一段新纪元。</p>
 <p style="font-size: 14px;">   ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)</p>
@@ -46,24 +38,54 @@ DTS支持阿里云RDS&DRDS的Binlog日志实时订阅,现推出首月免费体
 <li>canal解析binary log对象(原始为byte流)</li>
 </ol>
 
+<h1>重要版本更新说明</h1>
+
+canal 1.1.x系列,参考release文档:<a href="https://github.com/alibaba/canal/releases">版本发布信息</a>
+
+1. 整体性能测试&优化,提升了150%. #726 参考: 【[Performance](https://github.com/alibaba/canal/wiki/Performance)】
+2. 原生支持prometheus监控 #765 【[Prometheus QuickStart](https://github.com/alibaba/canal/wiki/Prometheus-QuickStart)】
+3. 原生支持kafka消息投递 #695 【[Canal Kafka QuickStart](https://github.com/alibaba/canal/wiki/Canal-Kafka-QuickStart)】
+4. 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: 【[Aliyun RDS QuickStart](https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart)】
+5. 原生支持docker镜像 #801 参考:  【[Docker QuickStart](https://github.com/alibaba/canal/wiki/Docker-QuickStart)】
+
 <h1>相关文档</h1>
 
 See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文档</a>
 
 <h3><a name="table-of-contents" class="anchor" href="#table-of-contents"><span class="mini-icon mini-icon-link"></span></a>wiki文档列表</h3>
 <ul>
-<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/Home">Home</a></li>
-<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/Introduction">Introduction</a></li>
-<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/QuickStart">QuickStart</a></li>
-<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/ClientExample">ClientExample</a></li>
-<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/AdminGuide">AdminGuide</a></li>
-<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/ClientAPI">ClientAPI</a></li>
-<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/DevGuide">DevGuide</a></li>
-<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/BinlogChange%28mysql5.6%29">BinlogChange(Mysql5.6)</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/Home">Home</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/Introduction">Introduction</a></li>
+<li>
+<a class="internal present" href="/alibaba/canal/wiki/QuickStart">QuickStart</a>
+<ul>
+<li><a class="internal present" href="/alibaba/canal/wiki/Docker-QuickStart">Docker QuickStart</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/Canal-Kafka-QuickStart">Canal Kafka QuickStart</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/aliyun-RDS-QuickStart">Aliyun RDS QuickStart</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/Prometheus-QuickStart">Prometheus QuickStart</a></li>
+</ul>
+</li>
+<li><a class="internal present" href="/alibaba/canal/wiki/AdminGuide">AdminGuide</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/ClientExample">ClientExample</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/ClientAPI">ClientAPI</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/Performance">Performance</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/DevGuide">DevGuide</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/BinlogChange%28mysql5.6%29">BinlogChange(Mysql5.6)</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/BinlogChange%28MariaDB%29">BinlogChange(MariaDB)</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/TableMetaTSDB">TableMetaTSDB</a></li>
 <li><a href="http://alibaba.github.com/canal/release.html">ReleaseNotes</a></li>
 <li><a href="https://github.com/alibaba/canal/releases">Download</a></li>
+<li><a class="internal present" href="/alibaba/canal/wiki/FAQ">FAQ</a></li>
 </ul>
 
+<h1>多语言业务</h1>
+
+1. canal整体交互协议设计上使用了protobuf3.0,理论上可以支持绝大部分的多语言场景,欢迎大家提交多客户端的PR
+    * canal java客户端: <a href="https://github.com/alibaba/canal/wiki/ClientExample"> https://github.com/alibaba/canal/wiki/ClientExample </a>
+    * canal c#客户端开源项目地址:<a href="https://github.com/CanalSharp/CanalSharp"> https://github.com/CanalSharp/CanalSharp </a>
+    * canal go客户端,开发进行中
+2. canal作为MySQL binlog的增量获取工具,可以将数据投递到MQ系统中,比如Kafka/RocketMQ,可以借助于MQ的多语言能力 
+
 <h1>相关资料</h1>
 
 * ADC阿里技术嘉年华分享ppt (放在google docs上,可能需要翻墙): <a href="https://docs.google.com/presentation/d/1MkszUPYRDkfVPz9IqOT1LLT5d9tuwde_WC8GZvjaDRg/edit?usp=sharing">ppt下载</href>  
@@ -91,9 +113,13 @@ See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文
 <li>报告issue:<a href="https://github.com/alibaba/canal/issues">issues</a></li>
 </ol>
 
-<pre>
-【招聘】阿里巴巴中间件团队招聘JAVA高级工程师
-岗位主要为技术型内容(非业务部门),阿里中间件整个体系对于未来想在技术上有所沉淀的同学还是非常有帮助的
-工作地点:杭州、北京均可. ps. 阿里待遇向来都是不错的,有意者可以QQ、微博私聊. 
-具体招聘内容:https://job.alibaba.com/zhaopin/position_detail.htm?positionId=32666
-</pre>
+<h3>最新更新</h3>
+<ol>
+<li>canal发布重大版本更新1.1.0,具体releaseNode参考:<a href="https://github.com/alibaba/canal/releases/tag/canal-1.1.0">https://github.com/alibaba/canal/releases/tag/canal-1.1.0</a></li>
+<li>canal c#客户端开源项目地址:<a href="https://github.com/CanalSharp/CanalSharp"> https://github.com/CanalSharp/CanalSharp </a>,推荐! </li>
+<li>canal QQ讨论群已经建立,群号:161559791 ,欢迎加入进行技术讨论。</li>
+<li>canal消费端项目开源: Otter(分布式数据库同步系统),地址:<a href="https://github.com/alibaba/otter">https://github.com/alibaba/otter</a></li>
+
+<li>Canal已在阿里云推出商业化版本 <a href="https://www.aliyun.com/product/dts?spm=a2c4g.11186623.cloudEssentials.80.srdwr7">数据传输服务DTS</a>, 开通即用,免去部署维护的昂贵使用成本。DTS针对阿里云RDS、DRDS等产品进行了适配,解决了Binlog日志回收,主备切换、VPC网络切换等场景下的订阅高可用问题。同时,针对RDS进行了针对性的性能优化。出于稳定性、性能及成本的考虑,强烈推荐阿里云用户使用DTS产品。<a href="https://help.aliyun.com/document_detail/26592.html?spm=a2c4g.11174283.6.539.t1Y91E">DTS产品使用文档</a></li>
+DTS支持阿里云RDS&DRDS的Binlog日志实时订阅,现推出首月免费体验,限时限量,<a href="https://common-buy.aliyun.com/?commodityCode=dtspre&request=%7b%22dts_function%22%3a%22data_subscribe%22%7d">立即体验>>></a>
+</ol>

+ 7 - 3
deployer/src/main/resources/kafka.yml

@@ -11,8 +11,12 @@ canalGetTimeout: 100
 flatMessage: true
 
 canalDestinations:
-  - canalDestination: example
-    topic: example
-    partition:
+- canalDestination: example
+  topic: exp3
+#  #对应topic分区数量
+#  partitionsNum: 3
+#  partitionHash:
+#    #库名.表名: 唯一主键
+#    mytest.person: id
 
 

+ 4 - 4
deployer/src/main/resources/spring/tsdb/sql/create_table.sql

@@ -26,10 +26,10 @@ CREATE TABLE IF NOT EXISTS `meta_history` (
   `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id',
   `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog应用的时间戳',
   `use_schema` varchar(1024) DEFAULT NULL COMMENT '执行sql时对应的schema',
-  `schema` varchar(1024) DEFAULT NULL COMMENT '对应的schema',
-  `table` varchar(1024) DEFAULT NULL COMMENT '对应的table',
-  `sql` longtext DEFAULT NULL COMMENT '执行的sql',
-  `type` varchar(256) DEFAULT NULL COMMENT 'sql类型',
+  `sql_schema` varchar(1024) DEFAULT NULL COMMENT '对应的schema',
+  `sql_table` varchar(1024) DEFAULT NULL COMMENT '对应的table',
+  `sql_text` longtext DEFAULT NULL COMMENT '执行的sql',
+  `sql_type` varchar(256) DEFAULT NULL COMMENT 'sql类型',
   `extra` text DEFAULT NULL COMMENT '额外的扩展信息',
   PRIMARY KEY (`id`),
   UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),

+ 22 - 11
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -1,14 +1,8 @@
 package com.alibaba.otter.canal.protocol;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.collect.Table;
+import java.util.*;
+
 import com.google.protobuf.ByteString;
 
 /**
@@ -31,7 +25,7 @@ public class FlatMessage implements Serializable {
     private List<Map<String, String>> data;
     private List<Map<String, String>> old;
 
-    public FlatMessage() {
+    public FlatMessage(){
     }
 
     public FlatMessage(long id){
@@ -126,6 +120,12 @@ public class FlatMessage implements Serializable {
         this.old = old;
     }
 
+    /**
+     * 将Message转换为FlatMessage
+     * 
+     * @param message 原生message
+     * @return FlatMessage列表
+     */
     public static List<FlatMessage> messageConverter(Message message) {
         try {
             if (message == null) {
@@ -231,11 +231,22 @@ public class FlatMessage implements Serializable {
         }
     }
 
+    /**
+     * 将FlatMessage按指定的字段值hash拆分
+     * 
+     * @param flatMessage flatMessage
+     * @param partitionsNum 分区数量
+     * @param pkHashConfig hash映射
+     * @return 拆分后的flatMessage数组
+     */
     public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum,
-                                                 Table<String, String, String> pkHashConfig) {
+                                                 Map<String, String> pkHashConfig) {
+        if (partitionsNum == null) {
+            partitionsNum = 1;
+        }
         FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
 
-        String pk = pkHashConfig.get(flatMessage.getDatabase(), flatMessage.getTable());
+        String pk = pkHashConfig.get(flatMessage.getDatabase() + "." + flatMessage.getTable());
         if (pk == null || flatMessage.getIsDdl()) {
             partitionMessages[0] = flatMessage;
         } else {

+ 40 - 8
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -67,15 +67,18 @@ public class CanalKafkaProducer {
         }
     }
 
-    public void send(KafkaProperties.Topic topic, Message message, Callback callback) {
+    public void send(KafkaProperties.CanalDestination canalDestination, Message message, Callback callback) {
         try {
             // producer.beginTransaction();
             if (!kafkaProperties.getFlatMessage()) {
                 ProducerRecord<String, Message> record;
-                if (topic.getPartition() != null) {
-                    record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
+                if (canalDestination.getPartition() != null) {
+                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(),
+                        canalDestination.getPartition(),
+                        null,
+                        message);
                 } else {
-                    record = new ProducerRecord<String, Message>(topic.getTopic(), message);
+                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(), 0, null, message);
                 }
 
                 producer.send(record);
@@ -84,9 +87,38 @@ public class CanalKafkaProducer {
                 List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
                 if (flatMessages != null) {
                     for (FlatMessage flatMessage : flatMessages) {
-                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic.getTopic(),
-                            JSON.toJSONString(flatMessage));
-                        producer2.send(record);
+                        if (canalDestination.getPartition() != null) {
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination
+                                .getTopic(), canalDestination.getPartition(), null, JSON.toJSONString(flatMessage));
+                            producer2.send(record);
+                        } else {
+                            if (canalDestination.getPartitionHash() != null
+                                && !canalDestination.getPartitionHash().isEmpty()) {
+                                FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
+                                    canalDestination.getPartitionsNum(),
+                                    canalDestination.getPartitionHash());
+                                int length = partitionFlatMessage.length;
+                                for (int i = 0; i < length; i++) {
+                                    FlatMessage flatMessagePart = partitionFlatMessage[i];
+                                    if (flatMessagePart != null) {
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                                canalDestination.getTopic(),
+                                                i,
+                                                null,
+                                                JSON.toJSONString(flatMessagePart));
+                                        producer2.send(record);
+                                    }
+                                }
+                            } else {
+                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                    canalDestination.getTopic(),
+                                    0,
+                                    null,
+                                    JSON.toJSONString(flatMessage));
+                                producer2.send(record);
+                            }
+                        }
+
                     }
                 }
             }
@@ -94,7 +126,7 @@ public class CanalKafkaProducer {
             // producer.commitTransaction();
             callback.commit();
             if (logger.isDebugEnabled()) {
-                logger.debug("send message to kafka topic: {}", topic.getTopic());
+                logger.debug("send message to kafka topic: {}", canalDestination.getTopic());
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);

+ 1 - 5
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaStarter.java

@@ -12,7 +12,6 @@ import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 
 import com.alibaba.otter.canal.kafka.KafkaProperties.CanalDestination;
-import com.alibaba.otter.canal.kafka.KafkaProperties.Topic;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.server.CanalServerStarter;
@@ -132,10 +131,7 @@ public class CanalKafkaStarter implements CanalServerStarter {
                     try {
                         int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                         if (batchId != -1 && size != 0) {
-                            Topic topic = new Topic();
-                            topic.setTopic(destination.getTopic());
-                            topic.setPartition(destination.getPartition());
-                            canalKafkaProducer.send(topic, message, new CanalKafkaProducer.Callback() {
+                            canalKafkaProducer.send(destination, message, new CanalKafkaProducer.Callback() {
 
                                 @Override
                                 public void commit() {

+ 14 - 46
server/src/main/java/com/alibaba/otter/canal/kafka/KafkaProperties.java

@@ -1,9 +1,8 @@
 package com.alibaba.otter.canal.kafka;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
 
 /**
  * kafka 配置项
@@ -27,10 +26,11 @@ public class KafkaProperties {
 
     public static class CanalDestination {
 
-        private String     canalDestination;
-        private String     topic;
-        private Integer    partition;
-        private Set<Topic> topics = new HashSet<Topic>();
+        private String              canalDestination;
+        private String              topic;
+        private Integer             partition;
+        private Integer             partitionsNum;
+        private Map<String, String> partitionHash;
 
         public String getCanalDestination() {
             return canalDestination;
@@ -56,52 +56,20 @@ public class KafkaProperties {
             this.partition = partition;
         }
 
-        public Set<Topic> getTopics() {
-            return topics;
+        public Integer getPartitionsNum() {
+            return partitionsNum;
         }
 
-        public void setTopics(Set<Topic> topics) {
-            this.topics = topics;
+        public void setPartitionsNum(Integer partitionsNum) {
+            this.partitionsNum = partitionsNum;
         }
-    }
-
-    public static class Topic {
-
-        private String  topic;
-        private Integer partition;
-
-        public String getTopic() {
-            return topic;
-        }
-
-        public void setTopic(String topic) {
-            this.topic = topic;
-        }
-
-        public Integer getPartition() {
-            return partition;
-        }
-
-        public void setPartition(Integer partition) {
-            this.partition = partition;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            Topic topic1 = (Topic) o;
 
-            if (topic != null ? !topic.equals(topic1.topic) : topic1.topic != null) return false;
-            return partition != null ? partition.equals(topic1.partition) : topic1.partition == null;
+        public Map<String, String> getPartitionHash() {
+            return partitionHash;
         }
 
-        @Override
-        public int hashCode() {
-            int result = topic != null ? topic.hashCode() : 0;
-            result = 31 * result + (partition != null ? partition.hashCode() : 0);
-            return result;
+        public void setPartitionHash(Map<String, String> partitionHash) {
+            this.partitionHash = partitionHash;
         }
     }