|
@@ -322,3 +322,224 @@ bin/startup.sh
|
|
|
```
|
|
|
#### 验证
|
|
|
修改mysql mytest.user表的数据, 将会自动同步到Oracle的MYTEST.TB_USER表下面, 并会打出DML的log
|
|
|
+
|
|
|
+
|
|
|
+## 五、ElasticSearch适配器
|
|
|
+### 5.1 修改启动器配置: application.yml
|
|
|
+```
|
|
|
+server:
|
|
|
+ port: 8081
|
|
|
+logging:
|
|
|
+ level:
|
|
|
+ com.alibaba.otter.canal.client.adapter.es: DEBUG
|
|
|
+spring:
|
|
|
+ jackson:
|
|
|
+ date-format: yyyy-MM-dd HH:mm:ss
|
|
|
+ time-zone: GMT+8
|
|
|
+ default-property-inclusion: non_null
|
|
|
+
|
|
|
+canal.conf:
|
|
|
+ canalServerHost: 127.0.0.1:11111
|
|
|
+ flatMessage: true
|
|
|
+ srcDataSources:
|
|
|
+ defaultDS:
|
|
|
+ url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
|
|
|
+ username: root
|
|
|
+ password: 121212
|
|
|
+ canalInstances:
|
|
|
+ - instance: example
|
|
|
+ adapterGroups:
|
|
|
+ - outAdapters:
|
|
|
+ - name: es
|
|
|
+ hosts: 127.0.0.1:9300 # es 集群地址, 逗号分隔
|
|
|
+ properties:
|
|
|
+ cluster.name: elasticsearch # es cluster name
|
|
|
+```
|
|
|
+adapter将会自动加载 conf/es 下的所有.yml结尾的配置文件
|
|
|
+### 5.2 适配器表映射文件
|
|
|
+修改 conf/es/mytest_user.yml文件:
|
|
|
+```
|
|
|
+dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
|
|
|
+destination: example # cannal的instance或者MQ的topic
|
|
|
+esMapping:
|
|
|
+ _index: mytest_user # es 的索引名称
|
|
|
+ _type: _doc # es 的doc名称
|
|
|
+ _id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
|
|
|
+# pk: id # 如果不需要_id, 则需要指定一个属性为主键属性
|
|
|
+ # sql映射
|
|
|
+ sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
|
|
|
+ a.c_time as _c_time, c.labels as _labels from user a
|
|
|
+ left join role b on b.id=a.role_id
|
|
|
+ left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
|
|
|
+ group by user_id) c on c.user_id=a.id"
|
|
|
+# objFields:
|
|
|
+# _labels: array:; # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
|
|
|
+# _obj: obj:{"test":"123"}
|
|
|
+ etlCondition: "where a.c_time>='{0}'" # etl 的条件参数
|
|
|
+ commitBatch: 3000 # 提交批大小
|
|
|
+```
|
|
|
+sql映射说明:
|
|
|
+
|
|
|
+sql支持多表关联自由组合, 但是有一定的限制:
|
|
|
+1. 主表不能为子查询语句
|
|
|
+2. 只能使用left outer join即最左表一定要是主表
|
|
|
+3. 关联从表如果是子查询不能有多张表
|
|
|
+4. 主sql中不能有where查询条件(从表子查询中可以有where条件但是不推荐, 可能会造成数据同步的不一致, 比如修改了where条件中的字段内容)
|
|
|
+5. 关联条件只允许主外键的'='操作不能出现其他常量判断比如: on a.role_id=b.id and b.statues=1
|
|
|
+6. 关联条件必须要有一个字段出现在主查询语句中比如: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必须出现在主select语句中
|
|
|
+
|
|
|
+
|
|
|
+Elastic Search的mapping 属性与sql的查询值将一一对应(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将
|
|
|
+映射到mapping的_email field, 这里以别名(如果有别名)作为最终的映射字段. 这里的_id可以填写到配置文件的 _id: _id映射.
|
|
|
+
|
|
|
+#### 5.2.1.单表映射索引示例sql:
|
|
|
+```
|
|
|
+select a.id as _id, a.name, a.role_id, a.c_time from user a
|
|
|
+```
|
|
|
+该sql对应的es mapping示例:
|
|
|
+```
|
|
|
+{
|
|
|
+ "mytest_user": {
|
|
|
+ "mappings": {
|
|
|
+ "_doc": {
|
|
|
+ "properties": {
|
|
|
+ "name": {
|
|
|
+ "type": "text"
|
|
|
+ },
|
|
|
+ "role_id": {
|
|
|
+ "type": "long"
|
|
|
+ },
|
|
|
+ "c_time": {
|
|
|
+ "type": "date"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+#### 5.2.2.单表映射索引示例sql带函数或运算操作:
|
|
|
+```
|
|
|
+select a.id as _id, concat(a.name,'_test') as name, a.role_id+10000 as role_id, a.c_time from user a
|
|
|
+```
|
|
|
+函数字段后必须跟上别名, 该sql对应的es mapping示例:
|
|
|
+```
|
|
|
+{
|
|
|
+ "mytest_user": {
|
|
|
+ "mappings": {
|
|
|
+ "_doc": {
|
|
|
+ "properties": {
|
|
|
+ "name": {
|
|
|
+ "type": "text"
|
|
|
+ },
|
|
|
+ "role_id": {
|
|
|
+ "type": "long"
|
|
|
+ },
|
|
|
+ "c_time": {
|
|
|
+ "type": "date"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+#### 5.2.3.多表映射(一对一, 多对一)索引示例sql:
|
|
|
+```
|
|
|
+select a.id as _id, a.name, a.role_id, b.role_name, a.c_time from user a
|
|
|
+left join role b on b.id = a.role_id
|
|
|
+```
|
|
|
+注:这里join操作只能是left outer join, 第一张表必须为主表!!
|
|
|
+
|
|
|
+该sql对应的es mapping示例:
|
|
|
+```
|
|
|
+{
|
|
|
+ "mytest_user": {
|
|
|
+ "mappings": {
|
|
|
+ "_doc": {
|
|
|
+ "properties": {
|
|
|
+ "name": {
|
|
|
+ "type": "text"
|
|
|
+ },
|
|
|
+ "role_id": {
|
|
|
+ "type": "long"
|
|
|
+ },
|
|
|
+ "role_name": {
|
|
|
+ "type": "text"
|
|
|
+ },
|
|
|
+ "c_time": {
|
|
|
+ "type": "date"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+#### 5.2.4.多表映射(一对多)索引示例sql:
|
|
|
+```
|
|
|
+select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a
|
|
|
+left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
|
|
|
+ group by user_id) c on c.user_id=a.id
|
|
|
+```
|
|
|
+注:left join 后的子查询只允许一张表, 即子查询中不能再包含子查询或者关联!!
|
|
|
+
|
|
|
+该sql对应的es mapping示例:
|
|
|
+```
|
|
|
+{
|
|
|
+ "mytest_user": {
|
|
|
+ "mappings": {
|
|
|
+ "_doc": {
|
|
|
+ "properties": {
|
|
|
+ "name": {
|
|
|
+ "type": "text"
|
|
|
+ },
|
|
|
+ "role_id": {
|
|
|
+ "type": "long"
|
|
|
+ },
|
|
|
+ "c_time": {
|
|
|
+ "type": "date"
|
|
|
+ },
|
|
|
+ "labels": {
|
|
|
+ "type": "text"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+#### 5.2.5.其它类型的sql示例:
|
|
|
+- geo type
|
|
|
+```
|
|
|
+select ... concat(IFNULL(a.latitude, 0), ',', IFNULL(a.longitude, 0)) AS location, ...
|
|
|
+```
|
|
|
+- 复合主键
|
|
|
+```
|
|
|
+select concat(a.id,'_',b.type) as _id, ... from user a left join role b on b.id=a.role_id
|
|
|
+```
|
|
|
+- 数组字段
|
|
|
+```
|
|
|
+select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a
|
|
|
+left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
|
|
|
+ group by user_id) c on c.user_id=a.id
|
|
|
+```
|
|
|
+配置中使用:
|
|
|
+```
|
|
|
+objFields:
|
|
|
+ labels: array:;
|
|
|
+```
|
|
|
+
|
|
|
+### 5.3 启动ES数据同步
|
|
|
+#### 启动canal-adapter启动器
|
|
|
+```
|
|
|
+bin/startup.sh
|
|
|
+```
|
|
|
+#### 验证
|
|
|
+1. 新增mysql mytest.user表的数据, 将会自动同步到es的mytest_user索引下面, 并会打出DML的log
|
|
|
+2. 修改mysql mytest.role表的role_name, 将会自动同步es的mytest_user索引中的role_name数据
|
|
|
+3. 新增或者修改mysql mytest.label表的label, 将会自动同步es的mytest_user索引中的labels数据
|