Browse Source

修改外键

mcy 6 years ago
parent
commit
5c8a5998d9

+ 21 - 12
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -58,12 +58,13 @@ public class ESSyncConfig {
         private String       pk;
         private String       parent;
         private String       sql;
-        private List<String> skips       = new ArrayList<>();
-        private boolean      alwaysSql   = false;
-        private int          commitBatch = 1000;
+        private List<String> skips           = new ArrayList<>();
+        private int          commitBatch     = 1000;
         private String       etlCondition;
+        private boolean      syncByTimestamp = false;            // 是否按时间戳定时同步
+        private Long         syncInterval;                       // 同步时间间隔
 
-        private SchemaItem   schemaItem;                     // sql解析结果模型
+        private SchemaItem   schemaItem;                         // sql解析结果模型
 
         public String get_index() {
             return _index;
@@ -121,14 +122,6 @@ public class ESSyncConfig {
             this.sql = sql;
         }
 
-        public boolean isAlwaysSql() {
-            return alwaysSql;
-        }
-
-        public void setAlwaysSql(boolean alwaysSql) {
-            this.alwaysSql = alwaysSql;
-        }
-
         public int getCommitBatch() {
             return commitBatch;
         }
@@ -145,6 +138,22 @@ public class ESSyncConfig {
             this.etlCondition = etlCondition;
         }
 
+        public Long getSyncInterval() {
+            return syncInterval;
+        }
+
+        public void setSyncInterval(Long syncInterval) {
+            this.syncInterval = syncInterval;
+        }
+
+        public boolean isSyncByTimestamp() {
+            return syncByTimestamp;
+        }
+
+        public void setSyncByTimestamp(boolean syncByTimestamp) {
+            this.syncByTimestamp = syncByTimestamp;
+        }
+
         public SchemaItem getSchemaItem() {
             return schemaItem;
         }

+ 267 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -0,0 +1,267 @@
+package com.alibaba.otter.canal.client.adapter.es.service;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.sql.DataSource;
+
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
+import com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil;
+import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.google.common.base.Joiner;
+
+public class ESEtlService {
+
+    private static Logger   logger = LoggerFactory.getLogger(ESEtlService.class);
+
+    private TransportClient transportClient;
+    private ESTemplate      esTemplate;
+    private ESSyncConfig    config;
+
+    public ESEtlService(TransportClient transportClient, ESSyncConfig config){
+        this.transportClient = transportClient;
+        this.esTemplate = new ESTemplate(transportClient);
+        this.config = config;
+    }
+
+    public EtlResult importData(List<String> params, boolean bulk) {
+        EtlResult etlResult = new EtlResult();
+        AtomicLong impCount = new AtomicLong();
+        List<String> errMsg = new ArrayList<>();
+        String esIndex = "";
+        if (config == null) {
+            logger.warn("esSycnCofnig is null, etl go end !");
+            etlResult.setErrorMessage("esSycnCofnig is null, etl go end !");
+            return etlResult;
+        }
+
+        ESMapping mapping = config.getEsMapping();
+
+        esIndex = mapping.get_index();
+        DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+        Pattern pattern = Pattern.compile(".*:(.*)://.*/(.*)\\?.*$");
+        Matcher matcher = pattern.matcher(dataSource.getUrl());
+        if (!matcher.find()) {
+            throw new RuntimeException("Not found the schema of jdbc-url: " + config.getDataSourceKey());
+        }
+        String schema = matcher.group(2);
+
+        logger.info("etl from db: {},  to es index: {}", schema, esIndex);
+        long start = System.currentTimeMillis();
+        try {
+            String sql = mapping.getSql();
+
+            // 拼接条件
+            if (mapping.getEtlCondition() != null && params != null) {
+                String etlCondition = mapping.getEtlCondition();
+                int size = params.size();
+                for (int i = 0; i < size; i++) {
+                    etlCondition = etlCondition.replace("{" + i + "}", params.get(i));
+                }
+
+                sql += " " + etlCondition;
+            }
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("etl sql : {}", mapping.getSql());
+            }
+
+            if (bulk) {
+                // 获取总数
+                String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
+                long cnt = (Long) ESSyncUtil.sqlRS(dataSource, countSql, rs -> {
+                    Long count = null;
+                    try {
+                        if (rs.next()) {
+                            count = ((Number) rs.getObject(1)).longValue();
+                        }
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                    }
+                    return count == null ? 0L : count;
+                });
+
+                // 当大于1万条记录时开启多线程
+                if (cnt >= 10000) {
+                    int threadCount = 3; // TODO 从配置读取默认为3
+                    long perThreadCnt = cnt / threadCount;
+                    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+                    List<Future<Boolean>> futures = new ArrayList<>(threadCount);
+                    for (int i = 0; i < threadCount; i++) {
+                        long offset = i * perThreadCnt;
+                        Long size = null;
+                        if (i != threadCount - 1) {
+                            size = perThreadCnt;
+                        }
+                        String sqlFinal;
+                        if (size != null) {
+                            sqlFinal = sql + " LIMIT " + offset + "," + size;
+                        } else {
+                            sqlFinal = sql + " LIMIT " + offset + "," + cnt;
+                        }
+                        Future<Boolean> future = executor
+                            .submit(() -> executeSqlImport(dataSource, sqlFinal, mapping, impCount, errMsg));
+                        futures.add(future);
+                    }
+
+                    for (Future<Boolean> future : futures) {
+                        future.get();
+                    }
+
+                    executor.shutdown();
+                } else {
+                    executeSqlImport(dataSource, sql, mapping, impCount, errMsg);
+                }
+            } else {
+                logger.info("自动ETL,无需统计记录总条数,直接进行ETL, index: {}", esIndex);
+                executeSqlImport(dataSource, sql, mapping, impCount, errMsg);
+            }
+
+            logger.info("数据全量导入完成,一共导入 {} 条数据, 耗时: {}", impCount.get(), System.currentTimeMillis() - start);
+            etlResult.setResultMessage("导入ES索引 " + esIndex + " 数据:" + impCount.get() + " 条");
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            errMsg.add(esIndex + " etl failed! ==>" + e.getMessage());
+        }
+        if (errMsg.isEmpty()) {
+            etlResult.setSucceeded(true);
+        } else {
+            etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
+        }
+        return etlResult;
+    }
+
+    private void processFailBulkResponse(BulkResponse bulkResponse, boolean hasParent) {
+        for (BulkItemResponse response : bulkResponse.getItems()) {
+            if (!response.isFailed()) {
+                continue;
+            }
+
+            if (response.getFailure().getStatus() == RestStatus.NOT_FOUND) {
+                logger.warn(response.getFailureMessage());
+            } else {
+                logger.error("全量导入数据有误 {}", response.getFailureMessage());
+                throw new RuntimeException("全量数据 etl 异常: " + response.getFailureMessage());
+            }
+        }
+    }
+
+    private boolean executeSqlImport(DataSource ds, String sql, ESMapping mapping, AtomicLong impCount,
+                                     List<String> errMsg) {
+        try {
+            ESSyncUtil.sqlRS(ds, sql, rs -> {
+                int count = 0;
+                try {
+                    BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+
+                    long batchBegin = System.currentTimeMillis();
+                    while (rs.next()) {
+                        Map<String, Object> esFieldData = new LinkedHashMap<>();
+                        for (FieldItem fieldItem : mapping.getSchemaItem().getSelectFields().values()) {
+
+                            // 如果是主键字段则不插入
+                            if (fieldItem.getFieldName().equals(mapping.get_id())) {
+                                continue;
+                            }
+
+                            String fieldName = fieldItem.getFieldName();
+                            if (mapping.getSkips().contains(fieldName)) {
+                                continue;
+                            }
+
+                            Object val = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
+                            esFieldData.put(fieldName, val);
+                        }
+                        Object idVal = null;
+                        if (mapping.get_id() != null) {
+                            idVal = rs.getObject(mapping.get_id());
+                        }
+
+                        if (idVal != null) {
+                            if (mapping.getParent() == null) {
+                                bulkRequestBuilder.add(transportClient
+                                    .prepareIndex(mapping.get_index(), mapping.get_type(), idVal.toString())
+                                    .setSource(esFieldData));
+                            } else {
+                                // ignore
+                            }
+                        } else {
+                            if (mapping.getParent() == null) {
+                                // TODO 删除pk对应的数据
+                                bulkRequestBuilder
+                                    .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type())
+                                        .setSource(esFieldData));
+                            } else {
+                                // ignore
+                            }
+                        }
+
+                        if (bulkRequestBuilder.numberOfActions() % mapping.getCommitBatch() == 0
+                            && bulkRequestBuilder.numberOfActions() > 0) {
+                            long esBatchBegin = System.currentTimeMillis();
+                            BulkResponse rp = bulkRequestBuilder.execute().actionGet();
+                            if (rp.hasFailures()) {
+                                this.processFailBulkResponse(rp, Objects.nonNull(mapping.getParent()));
+                            }
+
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("全量数据批量导入批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
+                                    (System.currentTimeMillis() - batchBegin),
+                                    (System.currentTimeMillis() - esBatchBegin),
+                                    bulkRequestBuilder.numberOfActions(),
+                                    mapping.get_index());
+                            }
+                            batchBegin = System.currentTimeMillis();
+                            bulkRequestBuilder = transportClient.prepareBulk();
+                        }
+                        count++;
+                        impCount.incrementAndGet();
+                    }
+
+                    if (bulkRequestBuilder.numberOfActions() > 0) {
+                        long esBatchBegin = System.currentTimeMillis();
+                        BulkResponse rp = bulkRequestBuilder.execute().actionGet();
+                        if (rp.hasFailures()) {
+                            this.processFailBulkResponse(rp, Objects.nonNull(mapping.getParent()));
+                        }
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("全量数据批量导入最后批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
+                                (System.currentTimeMillis() - batchBegin),
+                                (System.currentTimeMillis() - esBatchBegin),
+                                bulkRequestBuilder.numberOfActions(),
+                                mapping.get_index());
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                    errMsg.add(mapping.get_index() + " etl failed! ==>" + e.getMessage());
+                    throw new RuntimeException(e);
+                }
+                return count;
+            });
+
+            return true;
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+    }
+}

+ 130 - 90
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -75,6 +75,11 @@ public class ESSyncService {
 
     public void sync(ESSyncConfig config, Dml dml) {
         try {
+            // 如果是按时间戳定时更新则返回
+            if (config.getEsMapping().isSyncByTimestamp()) {
+                return;
+            }
+
             long begin = System.currentTimeMillis();
 
             String type = dml.getType();
@@ -98,91 +103,6 @@ public class ESSyncService {
         }
     }
 
-    private void delete(ESSyncConfig config, Dml dml) {
-        List<Map<String, Object>> dataList = dml.getData();
-        if (dataList == null || dataList.isEmpty()) {
-            return;
-        }
-        SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
-
-        for (Map<String, Object> data : dataList) {
-            if (data == null || data.isEmpty()) {
-                continue;
-            }
-
-            ESMapping mapping = config.getEsMapping();
-
-            // ------是主表------
-            if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
-                FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);
-                // 主键为简单字段
-                if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {
-                    Object idVal = esTemplate.getValFromData(mapping,
-                        data,
-                        idFieldItem.getFieldName(),
-                        idFieldItem.getColumn().getColumnName());
-
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index(),
-                            idVal);
-                    }
-                    boolean result = esTemplate.delete(mapping, idVal);
-                    if (!result) {
-                        logger.error("Main table delete es index error, destination:{}, table: {}, index: {}, id: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index(),
-                            idVal);
-                    }
-                } else {
-                    // ------主键带函数, 查询sql获取主键删除------
-                    mainTableDelete(config, dml, data);
-                }
-            }
-
-            // 从表的操作
-            for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
-                if (tableItem.isMain()) {
-                    continue;
-                }
-                if (!tableItem.getTableName().equals(dml.getTable())) {
-                    continue;
-                }
-
-                // 关联条件出现在主表查询条件是否为简单字段
-                boolean allFieldsSimple = true;
-                for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
-                    if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
-                        allFieldsSimple = false;
-                        break;
-                    }
-                }
-
-                // 所有查询字段均为简单字段
-                if (allFieldsSimple) {
-                    // 不是子查询
-                    if (!tableItem.isSubQuery()) {
-                        // ------关联表简单字段更新为null------
-                        Map<String, Object> esFieldData = new LinkedHashMap<>();
-                        for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
-                            esFieldData.put(fieldItem.getFieldName(), null);
-                        }
-                        joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
-                    } else {
-                        // ------关联子表简单字段更新------
-                        subTableSimpleFieldOperation(config, dml, data, null, tableItem);
-                    }
-                } else {
-                    // ------关联子表复杂字段更新 执行全sql更新es------
-                     jonTableWholeSqlOperation(config, dml, data, null, tableItem);
-                }
-            }
-        }
-    }
-
     /**
      * 插入操作dml
      * 
@@ -246,7 +166,7 @@ public class ESSyncService {
                         }
                     } else {
                         // ------关联子表复杂字段插入 执行全sql更新es------
-                        jonTableWholeSqlOperation(config, dml, data, null, tableItem);
+                        wholeSqlOperation(config, dml, data, null, tableItem);
                     }
                 }
             }
@@ -282,6 +202,7 @@ public class ESSyncService {
                     ESMapping mapping = config.getEsMapping();
                     String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
                     FieldItem idFieldItem = schemaItem.getSelectFields().get(idFieldName);
+
                     boolean idFieldSimple = true;
                     if (idFieldItem.isMethod() || idFieldItem.isBinaryOp()) {
                         idFieldSimple = false;
@@ -298,8 +219,36 @@ public class ESSyncService {
                             }
                         }
                     }
+
+                    // 不支持主键更新!!
+
+                    // 判断是否有外键更新
+                    boolean fkChanged = false;
+                    for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
+                        if (tableItem.isMain()) {
+                            continue;
+                        }
+                        boolean changed = false;
+                        for (List<FieldItem> fieldItems : tableItem.getRelationTableFields().values()) {
+                            for (FieldItem fieldItem : fieldItems) {
+                                if (old.containsKey(fieldItem.getColumn().getColumnName())) {
+                                    fkChanged = true;
+                                    changed = true;
+                                    break;
+                                }
+                            }
+                        }
+                        // 如果外键有修改,则更新所对应该表的所有查询条件数据
+                        if (changed) {
+                            for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                                fieldItem.getColumnItems()
+                                    .forEach(columnItem -> old.put(columnItem.getColumnName(), null));
+                            }
+                        }
+                    }
+
                     // 判断主键和所更新的字段是否全为简单字段
-                    if (idFieldSimple && allUpdateFieldSimple) {
+                    if (idFieldSimple && allUpdateFieldSimple && !fkChanged) {
                         singleTableSimpleFiledUpdate(config, dml, data, old);
                     } else {
                         mainTableUpdate(config, dml, data, old);
@@ -346,7 +295,7 @@ public class ESSyncService {
                         }
                     } else {
                         // ------关联子表复杂字段更新 执行全sql更新es------
-                        jonTableWholeSqlOperation(config, dml, data, old, tableItem);
+                        wholeSqlOperation(config, dml, data, old, tableItem);
                     }
                 }
             }
@@ -355,6 +304,97 @@ public class ESSyncService {
         }
     }
 
+    /**
+     * 删除操作dml
+     *
+     * @param config es配置
+     * @param dml dml数据
+     */
+    private void delete(ESSyncConfig config, Dml dml) {
+        List<Map<String, Object>> dataList = dml.getData();
+        if (dataList == null || dataList.isEmpty()) {
+            return;
+        }
+        SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
+
+        for (Map<String, Object> data : dataList) {
+            if (data == null || data.isEmpty()) {
+                continue;
+            }
+
+            ESMapping mapping = config.getEsMapping();
+
+            // ------是主表------
+            if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
+                FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);
+                // 主键为简单字段
+                if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {
+                    Object idVal = esTemplate.getValFromData(mapping,
+                        data,
+                        idFieldItem.getFieldName(),
+                        idFieldItem.getColumn().getColumnName());
+
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index(),
+                            idVal);
+                    }
+                    boolean result = esTemplate.delete(mapping, idVal);
+                    if (!result) {
+                        logger.error("Main table delete es index error, destination:{}, table: {}, index: {}, id: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index(),
+                            idVal);
+                    }
+                } else {
+                    // ------主键带函数, 查询sql获取主键删除------
+                    mainTableDelete(config, dml, data);
+                }
+            }
+
+            // 从表的操作
+            for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
+                if (tableItem.isMain()) {
+                    continue;
+                }
+                if (!tableItem.getTableName().equals(dml.getTable())) {
+                    continue;
+                }
+
+                // 关联条件出现在主表查询条件是否为简单字段
+                boolean allFieldsSimple = true;
+                for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                    if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
+                        allFieldsSimple = false;
+                        break;
+                    }
+                }
+
+                // 所有查询字段均为简单字段
+                if (allFieldsSimple) {
+                    // 不是子查询
+                    if (!tableItem.isSubQuery()) {
+                        // ------关联表简单字段更新为null------
+                        Map<String, Object> esFieldData = new LinkedHashMap<>();
+                        for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                            esFieldData.put(fieldItem.getFieldName(), null);
+                        }
+                        joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
+                    } else {
+                        // ------关联子表简单字段更新------
+                        subTableSimpleFieldOperation(config, dml, data, null, tableItem);
+                    }
+                } else {
+                    // ------关联子表复杂字段更新 执行全sql更新es------
+                    wholeSqlOperation(config, dml, data, null, tableItem);
+                }
+            }
+        }
+    }
+
     /**
      * 单表简单字段insert
      *
@@ -633,8 +673,8 @@ public class ESSyncService {
      * @param data 单行dml数据
      * @param tableItem 当前表配置
      */
-    private void jonTableWholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
-                                           Map<String, Object> old, TableItem tableItem) {
+    private void wholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old,
+                                   TableItem tableItem) {
         ESMapping mapping = config.getEsMapping();
         StringBuilder sql = new StringBuilder(mapping.getSql() + " WHERE ");
 

+ 1 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -330,7 +330,7 @@ public class ESTemplate {
             }
 
             for (ColumnItem columnItem : fieldItem.getColumnItems()) {
-                if (dmlOld.get(columnItem.getColumnName()) != null
+                if (dmlOld.containsKey(columnItem.getColumnName())
                     && !mapping.getSkips().contains(fieldItem.getFieldName())) {
                     esFieldData.put(fieldItem.getFieldName(),
                         getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));

+ 3 - 1
client-adapter/elasticsearch/src/main/resources/es/mytest_user.yml

@@ -10,5 +10,7 @@ esMapping:
         left join role b on b.id=a.role_id
         left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
         group by user_id) c on c.user_id=a.id"
+#  syncByTimestamp: false
+#  syncInterval: 2000 # ms
+  etlCondition: "where a.c_time>='{0}'"
   commitBatch: 3000
-  etlCondition: "where a.c_time>='{0}'"