Browse Source

整理注释

mcy 6 years ago
parent
commit
80cf533b05

+ 0 - 4
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -12,8 +12,6 @@ import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
@@ -33,8 +31,6 @@ import com.alibaba.otter.canal.client.adapter.support.*;
 @SPI("es")
 public class ESAdapter implements OuterAdapter {
 
-    private static Logger   logger = LoggerFactory.getLogger(ESAdapter.class);
-
     private TransportClient transportClient;
 
     private ESSyncService   esSyncService;

+ 6 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -3,6 +3,12 @@ package com.alibaba.otter.canal.client.adapter.es.config;
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * ES 映射配置
+ *
+ * @author rewerma 2018-11-01
+ * @version 1.0.0
+ */
 public class ESSyncConfig {
 
     private String    dataSourceKey; // 数据源key

+ 6 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -17,6 +17,12 @@ import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 
+/**
+ * ES 配置装载器
+ *
+ * @author rewerma 2018-11-01
+ * @version 1.0.0
+ */
 public class ESSyncConfigLoader {
 
     private static Logger                                   logger              = LoggerFactory

+ 8 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java

@@ -7,10 +7,16 @@ import java.util.Map;
 
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 
+/**
+ * ES 映射配置视图
+ *
+ * @author rewerma 2018-11-01
+ * @version 1.0.0
+ */
 public class SchemaItem {
 
-    private Map<String, TableItem>                aliasTableItems = new LinkedHashMap<>();
-    private Map<String, FieldItem>                selectFields    = new LinkedHashMap<>();
+    private Map<String, TableItem>                aliasTableItems = new LinkedHashMap<>(); // 别名对应表名
+    private Map<String, FieldItem>                selectFields    = new LinkedHashMap<>(); // 查询字段
     private String                                sql;
 
     private volatile Map<String, List<TableItem>> tableItemAliases;

+ 13 - 8
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SqlParser.java

@@ -34,8 +34,8 @@ public class SqlParser {
     /**
      * 解析sql
      * 
-     * @param sql
-     * @return
+     * @param sql sql
+     * @return 视图对象
      */
     public static SchemaItem parse(String sql) {
         try {
@@ -67,8 +67,8 @@ public class SqlParser {
     /**
      * 归集字段
      * 
-     * @param sqlSelectQueryBlock
-     * @return
+     * @param sqlSelectQueryBlock sqlSelectQueryBlock
+     * @return 字段属性列表
      */
     private static List<FieldItem> collectSelectQueryFields(MySqlSelectQueryBlock sqlSelectQueryBlock) {
         return sqlSelectQueryBlock.getSelectList().stream().map(selectItem -> {
@@ -82,8 +82,8 @@ public class SqlParser {
     /**
      * 解析字段
      * 
-     * @param expr
-     * @param fieldItem
+     * @param expr sql expr
+     * @param fieldItem 字段属性
      */
     private static void visitColumn(SQLExpr expr, FieldItem fieldItem) {
         if (expr instanceof SQLIdentifierExpr) {
@@ -123,6 +123,11 @@ public class SqlParser {
 
     /**
      * 解析表
+     * 
+     * @param schemaItem 视图对象
+     * @param sqlTableSource sqlTableSource
+     * @param tableItems 表对象列表
+     * @param tableItemTmp 表对象(临时)
      */
     private static void visitSelectTable(SchemaItem schemaItem, SQLTableSource sqlTableSource,
                                          List<TableItem> tableItems, TableItem tableItemTmp) {
@@ -174,8 +179,8 @@ public class SqlParser {
     /**
      * 解析on条件
      * 
-     * @param expr
-     * @param tableItem
+     * @param expr sql expr
+     * @param tableItem 表对象
      */
     private static void visitOnCondition(SQLExpr expr, TableItem tableItem) {
         if (!(expr instanceof SQLBinaryOpExpr)) {

+ 6 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -31,6 +31,12 @@ import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.EtlResult;
 import com.google.common.base.Joiner;
 
+/**
+ * ES ETL Service
+ *
+ * @author rewerma 2018-11-01
+ * @version 1.0.0
+ */
 public class ESEtlService {
 
     private static Logger   logger = LoggerFactory.getLogger(ESEtlService.class);

+ 15 - 11
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -6,8 +6,6 @@ import java.util.Map;
 
 import javax.sql.DataSource;
 
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,6 +21,12 @@ import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 
+/**
+ * ES 同步 Service
+ *
+ * @author rewerma 2018-11-01
+ * @version 1.0.0
+ */
 public class ESSyncService {
 
     private static Logger logger = LoggerFactory.getLogger(ESSyncService.class);
@@ -530,7 +534,7 @@ public class ESSyncService {
                                                TableItem tableItem, Map<String, Object> esFieldData) {
         ESMapping mapping = config.getEsMapping();
 
-        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
+        Map<String, Object> paramsTmp = new LinkedHashMap<>();
         for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
             for (FieldItem fieldItem : entry.getValue()) {
                 if (fieldItem.getColumnItems().size() == 1) {
@@ -544,7 +548,7 @@ public class ESSyncService {
                     if (fieldName.equals(mapping.get_id())) {
                         fieldName = "_id";
                     }
-                    queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
+                    paramsTmp.put(fieldName, value);
                 }
             }
         }
@@ -555,7 +559,7 @@ public class ESSyncService {
                 dml.getTable(),
                 mapping.get_index());
         }
-        boolean result = esTemplate.updateByQuery(mapping, queryBuilder, esFieldData);
+        boolean result = esTemplate.updateByQuery(config, paramsTmp, esFieldData);
         if (!result) {
             logger.error("Join table update es index by foreign key error, destination:{}, table: {}, index: {}",
                 config.getDestination(),
@@ -625,7 +629,7 @@ public class ESSyncService {
                         }
                     }
 
-                    BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
+                    Map<String, Object> paramsTmp = new LinkedHashMap<>();
                     for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
                         for (FieldItem fieldItem : entry.getValue()) {
                             if (fieldItem.getColumnItems().size() == 1) {
@@ -638,7 +642,7 @@ public class ESSyncService {
                                 if (fieldName.equals(mapping.get_id())) {
                                     fieldName = "_id";
                                 }
-                                queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
+                                paramsTmp.put(fieldName, value);
                             }
                         }
                     }
@@ -649,7 +653,7 @@ public class ESSyncService {
                             dml.getTable(),
                             mapping.get_index());
                     }
-                    boolean result = esTemplate.updateByQuery(mapping, queryBuilder, esFieldData);
+                    boolean result = esTemplate.updateByQuery(config, paramsTmp, esFieldData);
                     if (!result) {
                         logger.error(
                             "Join table update es index by query sql error, destination:{}, table: {}, index: {}",
@@ -737,7 +741,7 @@ public class ESSyncService {
                         }
                     }
 
-                    BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
+                    Map<String, Object> paramsTmp = new LinkedHashMap<>();
                     for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
                         for (FieldItem fieldItem : entry.getValue()) {
                             Object value = esTemplate
@@ -747,7 +751,7 @@ public class ESSyncService {
                             if (fieldName.equals(mapping.get_id())) {
                                 fieldName = "_id";
                             }
-                            queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
+                            paramsTmp.put(fieldName, value);
                         }
                     }
 
@@ -758,7 +762,7 @@ public class ESSyncService {
                             dml.getTable(),
                             mapping.get_index());
                     }
-                    boolean result = esTemplate.updateByQuery(mapping, queryBuilder, esFieldData);
+                    boolean result = esTemplate.updateByQuery(config, paramsTmp, esFieldData);
                     if (!result) {
                         logger.error(
                             "Join table update es index by query whole sql error, destination:{}, table: {}, index: {}",

+ 24 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java

@@ -21,6 +21,12 @@ import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
 
+/**
+ * ES 同步工具同类
+ *
+ * @author rewerma 2018-11-01
+ * @version 1.0.0
+ */
 public class ESSyncUtil {
 
     private static Logger logger = LoggerFactory.getLogger(ESSyncUtil.class);
@@ -169,6 +175,24 @@ public class ESSyncUtil {
             location.put("lat", Double.valueOf(point[0].trim()));
             location.put("lon", Double.valueOf(point[1].trim()));
             return location;
+        } else if ("array".equals(esType)) {
+            if ("".equals(val.toString().trim())) {
+                res = new ArrayList<>();
+            } else {
+                String value = val.toString();
+                String separator = ",";
+                if (!value.contains(",")) {
+                    if (value.contains(";")) {
+                        separator = ";";
+                    } else if (value.contains("|")) {
+                        separator = "|";
+                    } else if (value.contains("-")) {
+                        separator = "-";
+                    }
+                }
+                String[] values = value.split(separator);
+                return Arrays.asList(values);
+            }
         } else if ("object".equals(esType)) {
             if ("".equals(val.toString().trim())) {
                 res = new HashMap<>();

+ 68 - 24
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -9,7 +9,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
+
+import javax.sql.DataSource;
 
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -18,6 +19,7 @@ import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
@@ -31,16 +33,24 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.CollectionUtils;
 
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
-
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+
+/**
+ * ES 操作模板
+ *
+ * @author rewerma 2018-11-01
+ * @version 1.0.0
+ */
 public class ESTemplate {
 
     private static final Logger logger         = LoggerFactory.getLogger(ESTemplate.class);
 
-    private static final int    MAX_BATCH_SIZE = 10000;
+    private static final int    MAX_BATCH_SIZE = 1000;
 
     private TransportClient     transportClient;
 
@@ -48,10 +58,6 @@ public class ESTemplate {
         this.transportClient = transportClient;
     }
 
-    public void setTransportClient(TransportClient transportClient) {
-        this.transportClient = transportClient;
-    }
-
     /**
      * 插入数据
      * 
@@ -96,18 +102,6 @@ public class ESTemplate {
         return commitBulkRequest(bulkRequestBuilder);
     }
 
-    /**
-     * 结合 addBulkRequest4Update 批量更新
-     * 
-     * @param consumer
-     * @return
-     */
-    public boolean updateBatch(Consumer<BulkRequestBuilder> consumer) {
-        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
-        consumer.accept(bulkRequestBuilder);
-        return commitBulkRequest(bulkRequestBuilder);
-    }
-
     public void append4Update(BulkRequestBuilder bulkRequestBuilder, ESMapping mapping, Object pkVal,
                               Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
@@ -120,7 +114,7 @@ public class ESTemplate {
                 .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
                 .setSize(MAX_BATCH_SIZE)
                 .get();
-            for (SearchHit hit : response.getHits()) { // 理论上只有一条
+            for (SearchHit hit : response.getHits()) {
                 bulkRequestBuilder
                     .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
                         .setDoc(esFieldData));
@@ -131,13 +125,63 @@ public class ESTemplate {
     /**
      * update by query
      *
-     * @param mapping
-     * @param queryBuilder
+     * @param config
+     * @param paramsTmp
      * @param esFieldData
      * @return
      */
-    public boolean updateByQuery(ESMapping mapping, QueryBuilder queryBuilder, Map<String, Object> esFieldData) {
-        return updateByQuery(mapping, queryBuilder, esFieldData, 1);
+    public boolean updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {
+        if (paramsTmp.isEmpty()) {
+            return false;
+        }
+        ESMapping mapping = config.getEsMapping();
+        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
+        paramsTmp.forEach((fieldName, value) -> queryBuilder.must(QueryBuilders.termsQuery(fieldName, value)));
+
+        SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+            .setTypes(mapping.get_type())
+            .setSize(0)
+            .setQuery(queryBuilder)
+            .get();
+        long count = response.getHits().getTotalHits();
+        // 如果更新量大于Max, 查询sql批量更新
+        if (count > MAX_BATCH_SIZE) {
+            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+
+            DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+            // 查询sql更新
+            StringBuilder sql = new StringBuilder("SELECT * FROM (" + mapping.getSql() + ") _v WHERE ");
+            paramsTmp.forEach(
+                (fieldName, value) -> sql.append("_v.").append(fieldName).append("=").append(value).append(" AND "));
+            int len = sql.length();
+            sql.delete(len - 4, len);
+            ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
+                int exeCount = 1;
+                try {
+                    BulkRequestBuilder bulkRequestBuilderTmp = bulkRequestBuilder;
+                    while (rs.next()) {
+                        Object idVal = getIdValFromRS(mapping, rs);
+                        append4Update(bulkRequestBuilderTmp, mapping, idVal, esFieldData);
+
+                        if (exeCount % mapping.getCommitBatch() == 0 && bulkRequestBuilderTmp.numberOfActions() > 0) {
+                            commitBulkRequest(bulkRequestBuilderTmp);
+                            bulkRequestBuilderTmp = transportClient.prepareBulk();
+                        }
+                        exeCount++;
+                    }
+
+                    if (bulkRequestBuilder.numberOfActions() > 0) {
+                        commitBulkRequest(bulkRequestBuilderTmp);
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                return 0;
+            });
+            return true;
+        } else {
+            return updateByQuery(mapping, queryBuilder, esFieldData, 1);
+        }
     }
 
     private boolean updateByQuery(ESMapping mapping, QueryBuilder queryBuilder, Map<String, Object> esFieldData,

+ 25 - 24
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -53,8 +53,8 @@ public class HbaseAdapter implements OuterAdapter {
                         hbaseMapping = MappingConfigLoader.load();
                         mappingConfigCache = new HashMap<>();
                         for (MappingConfig mappingConfig : hbaseMapping.values()) {
-                            mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination())
-                                                   + "." + mappingConfig.getHbaseMapping().getDatabase() + "."
+                            mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                                   + mappingConfig.getHbaseMapping().getDatabase() + "."
                                                    + mappingConfig.getHbaseMapping().getTable(),
                                 mappingConfig);
                         }
@@ -100,33 +100,34 @@ public class HbaseAdapter implements OuterAdapter {
                 return etlResult;
             }
         } else {
-            DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(task);
-            if (dataSource != null) {
-                StringBuilder resultMsg = new StringBuilder();
-                boolean resSucc = true;
-                // ds不为空说明传入的是datasourceKey
-                for (MappingConfig configTmp : hbaseMapping.values()) {
-                    // 取所有的datasourceKey为task的配置
-                    if (configTmp.getDataSourceKey().equals(task)) {
-                        EtlResult etlRes = HbaseEtlService.importData(dataSource, hbaseTemplate, configTmp, params);
-                        if (!etlRes.getSucceeded()) {
-                            resSucc = false;
-                            resultMsg.append(etlRes.getErrorMessage()).append("\n");
-                        } else {
-                            resultMsg.append(etlRes.getResultMessage()).append("\n");
-                        }
+            StringBuilder resultMsg = new StringBuilder();
+            boolean resSucc = true;
+            // ds不为空说明传入的是datasourceKey
+            for (MappingConfig configTmp : hbaseMapping.values()) {
+                // 取所有的destination为task的配置
+                if (configTmp.getDestination().equals(task)) {
+                    DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(configTmp.getDataSourceKey());
+                    if (dataSource == null) {
+                        continue;
                     }
-                }
-                if (resultMsg.length() > 0) {
-                    etlResult.setSucceeded(resSucc);
-                    if (resSucc) {
-                        etlResult.setResultMessage(resultMsg.toString());
+                    EtlResult etlRes = HbaseEtlService.importData(dataSource, hbaseTemplate, configTmp, params);
+                    if (!etlRes.getSucceeded()) {
+                        resSucc = false;
+                        resultMsg.append(etlRes.getErrorMessage()).append("\n");
                     } else {
-                        etlResult.setErrorMessage(resultMsg.toString());
+                        resultMsg.append(etlRes.getResultMessage()).append("\n");
                     }
-                    return etlResult;
                 }
             }
+            if (resultMsg.length() > 0) {
+                etlResult.setSucceeded(resSucc);
+                if (resSucc) {
+                    etlResult.setResultMessage(resultMsg.toString());
+                } else {
+                    etlResult.setErrorMessage(resultMsg.toString());
+                }
+                return etlResult;
+            }
         }
         etlResult.setSucceeded(false);
         etlResult.setErrorMessage("Task not found");

+ 19 - 19
client-adapter/launcher/src/main/resources/application.yml

@@ -10,25 +10,25 @@ spring:
     time-zone: GMT+8
     default-property-inclusion: non_null
 
-canal.conf:
-  canalServerHost: 127.0.0.1:11111
+#canal.conf:
+#  canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
 #  bootstrapServers: slave1:6667 #or rocketmq
 #  flatMessage: true
-  canalInstances:
-  - instance: example
-    groups:
-    - outAdapters:
+#  canalInstances:
+#  - instance: example
+#    groups:
+#    - outAdapters:
 #      - name: logger
 #      - name: hbase
 #        properties:
 #          hbase.zookeeper.quorum: 127.0.0.1
 #          hbase.zookeeper.property.clientPort: 2181
 #          zookeeper.znode.parent: /hbase
-      - name: es
-        hosts: 127.0.0.1:9300
-        properties:
-          cluster.name: elasticsearch
+#      - name: es
+#        hosts: 127.0.0.1:9300
+#        properties:
+#          cluster.name: elasticsearch
 #  mqTopics:
 #  - mqMode: kafka
 #    topic: example
@@ -44,12 +44,12 @@ canal.conf:
 #      outAdapters:
 #      - name: logger
 
-adapter.conf:
-  datasourceConfigs:
-    defaultDS:
-      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
-      username: root
-      password: 121212
-  adapterConfigs:
-  - hbase/mytest_person2.yml
-  - es/mytest_user.yml
+#adapter.conf:
+#  datasourceConfigs:
+#    defaultDS:
+#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
+#      username: root
+#      password: 121212
+#  adapterConfigs:
+#  - hbase/mytest_person2.yml
+#  - es/mytest_user.yml