|
@@ -0,0 +1,862 @@
|
|
|
+package com.alibaba.otter.canal.client.adapter.es.service;
|
|
|
+
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+import javax.sql.DataSource;
|
|
|
+
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.serializer.SerializerFeature;
|
|
|
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
|
|
|
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
|
|
|
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
|
|
|
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
|
|
|
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
|
|
|
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
|
|
|
+import com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil;
|
|
|
+import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
|
|
|
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
|
|
|
+import com.alibaba.otter.canal.client.adapter.support.Dml;
|
|
|
+
|
|
|
+/**
|
|
|
+ * ES 同步 Service
|
|
|
+ *
|
|
|
+ * @author rewerma 2018-11-01
|
|
|
+ * @version 1.0.0
|
|
|
+ */
|
|
|
+public class ESSyncService {
|
|
|
+
|
|
|
+ private static Logger logger = LoggerFactory.getLogger(ESSyncService.class);
|
|
|
+
|
|
|
+ private ESTemplate esTemplate;
|
|
|
+
|
|
|
+ public ESSyncService(ESTemplate esTemplate){
|
|
|
+ this.esTemplate = esTemplate;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void sync(Collection<ESSyncConfig> esSyncConfigs, Dml dml) {
|
|
|
+ long begin = System.currentTimeMillis();
|
|
|
+ if (esSyncConfigs != null) {
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Destination: {}, database:{}, table:{}, type:{}, effect index count: {}",
|
|
|
+ dml.getDestination(),
|
|
|
+ dml.getDatabase(),
|
|
|
+ dml.getTable(),
|
|
|
+ dml.getType(),
|
|
|
+ esSyncConfigs.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ for (ESSyncConfig config : esSyncConfigs) {
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Prepared to sync index: {}, destination: {}",
|
|
|
+ config.getEsMapping().get_index(),
|
|
|
+ dml.getDestination());
|
|
|
+ }
|
|
|
+ this.sync(config, dml);
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Sync completed: {}, destination: {}",
|
|
|
+ config.getEsMapping().get_index(),
|
|
|
+ dml.getDestination());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Sync elapsed time: {} ms, effect index count:{}, destination: {}",
|
|
|
+ (System.currentTimeMillis() - begin),
|
|
|
+ esSyncConfigs.size(),
|
|
|
+ dml.getDestination());
|
|
|
+ }
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void sync(ESSyncConfig config, Dml dml) {
|
|
|
+ try {
|
|
|
+ // 如果是按时间戳定时更新则返回
|
|
|
+ if (config.getEsMapping().isSyncByTimestamp()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ long begin = System.currentTimeMillis();
|
|
|
+
|
|
|
+ String type = dml.getType();
|
|
|
+ if (type != null && type.equalsIgnoreCase("INSERT")) {
|
|
|
+ insert(config, dml);
|
|
|
+ } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
|
|
|
+ update(config, dml);
|
|
|
+ } else if (type != null && type.equalsIgnoreCase("DELETE")) {
|
|
|
+ delete(config, dml);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Sync elapsed time: {} ms,destination: {}, es index: {}",
|
|
|
+ (System.currentTimeMillis() - begin),
|
|
|
+ dml.getDestination(),
|
|
|
+ config.getEsMapping().get_index());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);
|
|
|
+ logger.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 插入操作dml
|
|
|
+ *
|
|
|
+ * @param config es配置
|
|
|
+ * @param dml dml数据
|
|
|
+ */
|
|
|
+ private void insert(ESSyncConfig config, Dml dml) {
|
|
|
+ List<Map<String, Object>> dataList = dml.getData();
|
|
|
+ if (dataList == null || dataList.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
|
|
|
+ for (Map<String, Object> data : dataList) {
|
|
|
+ if (data == null || data.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
|
|
|
+ // ------单表 & 所有字段都为简单字段------
|
|
|
+ singleTableSimpleFiledInsert(config, dml, data);
|
|
|
+ } else {
|
|
|
+ // ------是主表 查询sql来插入------
|
|
|
+ if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
|
|
|
+ mainTableInsert(config, dml, data);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 从表的操作
|
|
|
+ for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
|
|
|
+ if (tableItem.isMain()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (!tableItem.getTableName().equals(dml.getTable())) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ // 关联条件出现在主表查询条件是否为简单字段
|
|
|
+ boolean allFieldsSimple = true;
|
|
|
+ for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
|
|
|
+ if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
|
|
|
+ allFieldsSimple = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 所有查询字段均为简单字段
|
|
|
+ if (allFieldsSimple) {
|
|
|
+ // 不是子查询
|
|
|
+ if (!tableItem.isSubQuery()) {
|
|
|
+ // ------关联表简单字段插入------
|
|
|
+ Map<String, Object> esFieldData = new LinkedHashMap<>();
|
|
|
+ for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
|
|
|
+ Object value = esTemplate.getValFromData(config.getEsMapping(),
|
|
|
+ data,
|
|
|
+ fieldItem.getFieldName(),
|
|
|
+ fieldItem.getColumn().getColumnName());
|
|
|
+ esFieldData.put(fieldItem.getFieldName(), value);
|
|
|
+ }
|
|
|
+
|
|
|
+ joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
|
|
|
+ } else {
|
|
|
+ // ------关联子表简单字段插入------
|
|
|
+ subTableSimpleFieldOperation(config, dml, data, null, tableItem);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // ------关联子表复杂字段插入 执行全sql更新es------
|
|
|
+ wholeSqlOperation(config, dml, data, null, tableItem);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 更新操作dml
|
|
|
+ *
|
|
|
+ * @param config es配置
|
|
|
+ * @param dml dml数据
|
|
|
+ */
|
|
|
+ private void update(ESSyncConfig config, Dml dml) {
|
|
|
+ List<Map<String, Object>> dataList = dml.getData();
|
|
|
+ List<Map<String, Object>> oldList = dml.getOld();
|
|
|
+ if (dataList == null || dataList.isEmpty() || oldList == null || oldList.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
|
|
|
+ int i = 0;
|
|
|
+ for (Map<String, Object> data : dataList) {
|
|
|
+ Map<String, Object> old = oldList.get(i);
|
|
|
+ if (data == null || data.isEmpty() || old == null || old.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
|
|
|
+ // ------单表 & 所有字段都为简单字段------
|
|
|
+ singleTableSimpleFiledUpdate(config, dml, data, old);
|
|
|
+ } else {
|
|
|
+ // ------主表 查询sql来更新------
|
|
|
+ if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
|
|
|
+ ESMapping mapping = config.getEsMapping();
|
|
|
+ String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
|
|
|
+ FieldItem idFieldItem = schemaItem.getSelectFields().get(idFieldName);
|
|
|
+
|
|
|
+ boolean idFieldSimple = true;
|
|
|
+ if (idFieldItem.isMethod() || idFieldItem.isBinaryOp()) {
|
|
|
+ idFieldSimple = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean allUpdateFieldSimple = true;
|
|
|
+ out: for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
|
|
|
+ for (ColumnItem columnItem : fieldItem.getColumnItems()) {
|
|
|
+ if (old.containsKey(columnItem.getColumnName())) {
|
|
|
+ if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
|
|
|
+ allUpdateFieldSimple = false;
|
|
|
+ break out;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 不支持主键更新!!
|
|
|
+
|
|
|
+ // 判断是否有外键更新
|
|
|
+ boolean fkChanged = false;
|
|
|
+ for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
|
|
|
+ if (tableItem.isMain()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ boolean changed = false;
|
|
|
+ for (List<FieldItem> fieldItems : tableItem.getRelationTableFields().values()) {
|
|
|
+ for (FieldItem fieldItem : fieldItems) {
|
|
|
+ if (old.containsKey(fieldItem.getColumn().getColumnName())) {
|
|
|
+ fkChanged = true;
|
|
|
+ changed = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 如果外键有修改,则更新所对应该表的所有查询条件数据
|
|
|
+ if (changed) {
|
|
|
+ for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
|
|
|
+ fieldItem.getColumnItems()
|
|
|
+ .forEach(columnItem -> old.put(columnItem.getColumnName(), null));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 判断主键和所更新的字段是否全为简单字段
|
|
|
+ if (idFieldSimple && allUpdateFieldSimple && !fkChanged) {
|
|
|
+ singleTableSimpleFiledUpdate(config, dml, data, old);
|
|
|
+ } else {
|
|
|
+ mainTableUpdate(config, dml, data, old);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 从表的操作
|
|
|
+ for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
|
|
|
+ if (tableItem.isMain()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (!tableItem.getTableName().equals(dml.getTable())) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 关联条件出现在主表查询条件是否为简单字段
|
|
|
+ boolean allFieldsSimple = true;
|
|
|
+ for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
|
|
|
+ if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
|
|
|
+ allFieldsSimple = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 所有查询字段均为简单字段
|
|
|
+ if (allFieldsSimple) {
|
|
|
+ // 不是子查询
|
|
|
+ if (!tableItem.isSubQuery()) {
|
|
|
+ // ------关联表简单字段更新------
|
|
|
+ Map<String, Object> esFieldData = new LinkedHashMap<>();
|
|
|
+ for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
|
|
|
+ if (old.containsKey(fieldItem.getColumn().getColumnName())) {
|
|
|
+ Object value = esTemplate.getValFromData(config.getEsMapping(),
|
|
|
+ data,
|
|
|
+ fieldItem.getFieldName(),
|
|
|
+ fieldItem.getColumn().getColumnName());
|
|
|
+ esFieldData.put(fieldItem.getFieldName(), value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
|
|
|
+ } else {
|
|
|
+ // ------关联子表简单字段更新------
|
|
|
+ subTableSimpleFieldOperation(config, dml, data, old, tableItem);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // ------关联子表复杂字段更新 执行全sql更新es------
|
|
|
+ wholeSqlOperation(config, dml, data, old, tableItem);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ i++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 删除操作dml
|
|
|
+ *
|
|
|
+ * @param config es配置
|
|
|
+ * @param dml dml数据
|
|
|
+ */
|
|
|
+ private void delete(ESSyncConfig config, Dml dml) {
|
|
|
+ List<Map<String, Object>> dataList = dml.getData();
|
|
|
+ if (dataList == null || dataList.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
|
|
|
+
|
|
|
+ for (Map<String, Object> data : dataList) {
|
|
|
+ if (data == null || data.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ ESMapping mapping = config.getEsMapping();
|
|
|
+
|
|
|
+ // ------是主表------
|
|
|
+ if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
|
|
|
+ FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);
|
|
|
+ // 主键为简单字段
|
|
|
+ if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {
|
|
|
+ Object idVal = esTemplate.getValFromData(mapping,
|
|
|
+ data,
|
|
|
+ idFieldItem.getFieldName(),
|
|
|
+ idFieldItem.getColumn().getColumnName());
|
|
|
+
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ idVal);
|
|
|
+ }
|
|
|
+ boolean result = esTemplate.delete(mapping, idVal);
|
|
|
+ if (!result) {
|
|
|
+ logger.error("Main table delete es index error, destination:{}, table: {}, index: {}, id: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ idVal);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // ------主键带函数, 查询sql获取主键删除------
|
|
|
+ mainTableDelete(config, dml, data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 从表的操作
|
|
|
+ for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
|
|
|
+ if (tableItem.isMain()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (!tableItem.getTableName().equals(dml.getTable())) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 关联条件出现在主表查询条件是否为简单字段
|
|
|
+ boolean allFieldsSimple = true;
|
|
|
+ for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
|
|
|
+ if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
|
|
|
+ allFieldsSimple = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 所有查询字段均为简单字段
|
|
|
+ if (allFieldsSimple) {
|
|
|
+ // 不是子查询
|
|
|
+ if (!tableItem.isSubQuery()) {
|
|
|
+ // ------关联表简单字段更新为null------
|
|
|
+ Map<String, Object> esFieldData = new LinkedHashMap<>();
|
|
|
+ for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
|
|
|
+ esFieldData.put(fieldItem.getFieldName(), null);
|
|
|
+ }
|
|
|
+ joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
|
|
|
+ } else {
|
|
|
+ // ------关联子表简单字段更新------
|
|
|
+ subTableSimpleFieldOperation(config, dml, data, null, tableItem);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // ------关联子表复杂字段更新 执行全sql更新es------
|
|
|
+ wholeSqlOperation(config, dml, data, null, tableItem);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 单表简单字段insert
|
|
|
+ *
|
|
|
+ * @param config es配置
|
|
|
+ * @param dml dml信息
|
|
|
+ * @param data 单行dml数据
|
|
|
+ */
|
|
|
+ private void singleTableSimpleFiledInsert(ESSyncConfig config, Dml dml, Map<String, Object> data) {
|
|
|
+ ESMapping mapping = config.getEsMapping();
|
|
|
+ Map<String, Object> esFieldData = new LinkedHashMap<>();
|
|
|
+ Object idVal = esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
|
|
|
+
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Single table insert ot es index, destination:{}, table: {}, index: {}, id: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ idVal);
|
|
|
+ }
|
|
|
+ boolean result = esTemplate.insert(mapping, idVal, esFieldData);
|
|
|
+ if (!result) {
|
|
|
+ logger.error("Single table insert to es index error, destination:{}, table: {}, index: {}, id: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ idVal);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 主表(单表)复杂字段insert
|
|
|
+ *
|
|
|
+ * @param config es配置
|
|
|
+ * @param dml dml信息
|
|
|
+ * @param data 单行dml数据
|
|
|
+ */
|
|
|
+ private void mainTableInsert(ESSyncConfig config, Dml dml, Map<String, Object> data) {
|
|
|
+ ESMapping mapping = config.getEsMapping();
|
|
|
+ String sql = mapping.getSql();
|
|
|
+ String condition = ESSyncUtil.pkConditionSql(mapping, data);
|
|
|
+ sql = ESSyncUtil.appendCondition(sql, condition);
|
|
|
+ DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Main table insert ot es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ sql.replace("\n", " "));
|
|
|
+ }
|
|
|
+ ESSyncUtil.sqlRS(ds, sql, rs -> {
|
|
|
+ try {
|
|
|
+ while (rs.next()) {
|
|
|
+ Map<String, Object> esFieldData = new LinkedHashMap<>();
|
|
|
+ Object idVal = esTemplate.getESDataFromRS(mapping, rs, esFieldData);
|
|
|
+
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace(
|
|
|
+ "Main table insert ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ idVal);
|
|
|
+ }
|
|
|
+ boolean result = esTemplate.insert(mapping, idVal, esFieldData);
|
|
|
+ if (!result) {
|
|
|
+ logger.error(
|
|
|
+ "Main table insert to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ idVal);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private void mainTableDelete(ESSyncConfig config, Dml dml, Map<String, Object> data) {
|
|
|
+ ESMapping mapping = config.getEsMapping();
|
|
|
+ String sql = mapping.getSql();
|
|
|
+ String condition = ESSyncUtil.pkConditionSql(mapping, data);
|
|
|
+ sql = ESSyncUtil.appendCondition(sql, condition);
|
|
|
+ DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Main table delete es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ sql.replace("\n", " "));
|
|
|
+ }
|
|
|
+ ESSyncUtil.sqlRS(ds, sql, rs -> {
|
|
|
+ try {
|
|
|
+ while (rs.next()) {
|
|
|
+ Object idVal = esTemplate.getIdValFromRS(mapping, rs);
|
|
|
+
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace(
|
|
|
+ "Main table delete ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ idVal);
|
|
|
+ }
|
|
|
+ boolean result = esTemplate.delete(mapping, idVal);
|
|
|
+ if (!result) {
|
|
|
+ logger.error(
|
|
|
+ "Main table delete to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ idVal);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 关联表主表简单字段operation
|
|
|
+ *
|
|
|
+ * @param config es配置
|
|
|
+ * @param dml dml信息
|
|
|
+ * @param data 单行dml数据
|
|
|
+ * @param tableItem 当前表配置
|
|
|
+ */
|
|
|
+ private void joinTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
|
|
|
+ TableItem tableItem, Map<String, Object> esFieldData) {
|
|
|
+ ESMapping mapping = config.getEsMapping();
|
|
|
+
|
|
|
+ Map<String, Object> paramsTmp = new LinkedHashMap<>();
|
|
|
+ for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
|
|
|
+ for (FieldItem fieldItem : entry.getValue()) {
|
|
|
+ if (fieldItem.getColumnItems().size() == 1) {
|
|
|
+ Object value = esTemplate.getValFromData(mapping,
|
|
|
+ data,
|
|
|
+ fieldItem.getFieldName(),
|
|
|
+ entry.getKey().getColumn().getColumnName());
|
|
|
+
|
|
|
+ String fieldName = fieldItem.getFieldName();
|
|
|
+ // 判断是否是主键
|
|
|
+ if (fieldName.equals(mapping.get_id())) {
|
|
|
+ fieldName = "_id";
|
|
|
+ }
|
|
|
+ paramsTmp.put(fieldName, value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.trace("Join table update es index by foreign key, destination:{}, table: {}, index: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index());
|
|
|
+ }
|
|
|
+ boolean result = esTemplate.updateByQuery(config, paramsTmp, esFieldData);
|
|
|
+ if (!result) {
|
|
|
+ logger.error("Join table update es index by foreign key error, destination:{}, table: {}, index: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 关联子查询, 主表简单字段operation
|
|
|
+ *
|
|
|
+ * @param config es配置
|
|
|
+ * @param dml dml信息
|
|
|
+ * @param data 单行dml数据
|
|
|
+ * @param old 单行old数据
|
|
|
+ * @param tableItem 当前表配置
|
|
|
+ */
|
|
|
+ private void subTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
|
|
|
+ Map<String, Object> old, TableItem tableItem) {
|
|
|
+ ESMapping mapping = config.getEsMapping();
|
|
|
+ StringBuilder sql = new StringBuilder(
|
|
|
+ "SELECT * FROM (" + tableItem.getSubQuerySql() + ") " + tableItem.getAlias() + " WHERE ");
|
|
|
+
|
|
|
+ for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
|
|
|
+ String columnName = fkFieldItem.getColumn().getColumnName();
|
|
|
+ Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
|
|
|
+ ESSyncUtil.appendCondition(sql, value, tableItem.getAlias(), columnName);
|
|
|
+ }
|
|
|
+ int len = sql.length();
|
|
|
+ sql.delete(len - 5, len);
|
|
|
+ DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ sql.toString().replace("\n", " "));
|
|
|
+ }
|
|
|
+ ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
|
|
|
+ try {
|
|
|
+ while (rs.next()) {
|
|
|
+ Map<String, Object> esFieldData = new LinkedHashMap<>();
|
|
|
+
|
|
|
+ for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
|
|
|
+ if (old != null) {
|
|
|
+ out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {
|
|
|
+ for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {
|
|
|
+ if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))
|
|
|
+ for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
|
|
|
+ if (old.containsKey(columnItem.getColumnName())) {
|
|
|
+ Object val = esTemplate.getValFromRS(mapping,
|
|
|
+ rs,
|
|
|
+ fieldItem.getFieldName(),
|
|
|
+ fieldItem.getColumn().getColumnName());
|
|
|
+ esFieldData.put(fieldItem.getFieldName(), val);
|
|
|
+ break out;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ Object val = esTemplate.getValFromRS(mapping,
|
|
|
+ rs,
|
|
|
+ fieldItem.getFieldName(),
|
|
|
+ fieldItem.getColumn().getColumnName());
|
|
|
+ esFieldData.put(fieldItem.getFieldName(), val);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, Object> paramsTmp = new LinkedHashMap<>();
|
|
|
+ for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
|
|
|
+ for (FieldItem fieldItem : entry.getValue()) {
|
|
|
+ if (fieldItem.getColumnItems().size() == 1) {
|
|
|
+ Object value = esTemplate.getValFromRS(mapping,
|
|
|
+ rs,
|
|
|
+ fieldItem.getFieldName(),
|
|
|
+ entry.getKey().getColumn().getColumnName());
|
|
|
+ String fieldName = fieldItem.getFieldName();
|
|
|
+ // 判断是否是主键
|
|
|
+ if (fieldName.equals(mapping.get_id())) {
|
|
|
+ fieldName = "_id";
|
|
|
+ }
|
|
|
+ paramsTmp.put(fieldName, value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index());
|
|
|
+ }
|
|
|
+ boolean result = esTemplate.updateByQuery(config, paramsTmp, esFieldData);
|
|
|
+ if (!result) {
|
|
|
+ logger.error(
|
|
|
+ "Join table update es index by query sql error, destination:{}, table: {}, index: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 关联(子查询), 主表复杂字段operation, 全sql执行
|
|
|
+ *
|
|
|
+ * @param config es配置
|
|
|
+ * @param dml dml信息
|
|
|
+ * @param data 单行dml数据
|
|
|
+ * @param tableItem 当前表配置
|
|
|
+ */
|
|
|
+ private void wholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old,
|
|
|
+ TableItem tableItem) {
|
|
|
+ ESMapping mapping = config.getEsMapping();
|
|
|
+ StringBuilder sql = new StringBuilder(mapping.getSql() + " WHERE ");
|
|
|
+
|
|
|
+ for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
|
|
|
+ String columnName = fkFieldItem.getColumn().getColumnName();
|
|
|
+ Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
|
|
|
+ ESSyncUtil.appendCondition(sql, value, tableItem.getAlias(), columnName);
|
|
|
+ }
|
|
|
+ int len = sql.length();
|
|
|
+ sql.delete(len - 5, len);
|
|
|
+ DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Join table update es index by query whole sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ sql.toString().replace("\n", " "));
|
|
|
+ }
|
|
|
+ ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
|
|
|
+ try {
|
|
|
+ while (rs.next()) {
|
|
|
+ Map<String, Object> esFieldData = new LinkedHashMap<>();
|
|
|
+ for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
|
|
|
+ if (old != null) {
|
|
|
+ // 从表子查询
|
|
|
+ out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {
|
|
|
+ for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {
|
|
|
+ if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))
|
|
|
+ for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
|
|
|
+ if (old.containsKey(columnItem.getColumnName())) {
|
|
|
+ Object val = esTemplate.getValFromRS(mapping,
|
|
|
+ rs,
|
|
|
+ fieldItem.getFieldName(),
|
|
|
+ fieldItem.getFieldName());
|
|
|
+ esFieldData.put(fieldItem.getFieldName(), val);
|
|
|
+ break out;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 从表非子查询
|
|
|
+ for (FieldItem fieldItem1 : tableItem.getRelationSelectFieldItems()) {
|
|
|
+ if (fieldItem1.equals(fieldItem)) {
|
|
|
+ for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
|
|
|
+ if (old.containsKey(columnItem.getColumnName())) {
|
|
|
+ Object val = esTemplate.getValFromRS(mapping,
|
|
|
+ rs,
|
|
|
+ fieldItem.getFieldName(),
|
|
|
+ fieldItem.getFieldName());
|
|
|
+ esFieldData.put(fieldItem.getFieldName(), val);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ Object val = esTemplate
|
|
|
+ .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
|
|
|
+ esFieldData.put(fieldItem.getFieldName(), val);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, Object> paramsTmp = new LinkedHashMap<>();
|
|
|
+ for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
|
|
|
+ for (FieldItem fieldItem : entry.getValue()) {
|
|
|
+ Object value = esTemplate
|
|
|
+ .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
|
|
|
+ String fieldName = fieldItem.getFieldName();
|
|
|
+ // 判断是否是主键
|
|
|
+ if (fieldName.equals(mapping.get_id())) {
|
|
|
+ fieldName = "_id";
|
|
|
+ }
|
|
|
+ paramsTmp.put(fieldName, value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.trace(
|
|
|
+ "Join table update es index by query whole sql, destination:{}, table: {}, index: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index());
|
|
|
+ }
|
|
|
+ boolean result = esTemplate.updateByQuery(config, paramsTmp, esFieldData);
|
|
|
+ if (!result) {
|
|
|
+ logger.error(
|
|
|
+ "Join table update es index by query whole sql error, destination:{}, table: {}, index: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 单表简单字段update
|
|
|
+ *
|
|
|
+ * @param config es配置
|
|
|
+ * @param dml dml信息
|
|
|
+ * @param data 单行data数据
|
|
|
+ * @param old 单行old数据
|
|
|
+ */
|
|
|
+ private void singleTableSimpleFiledUpdate(ESSyncConfig config, Dml dml, Map<String, Object> data,
|
|
|
+ Map<String, Object> old) {
|
|
|
+ ESMapping mapping = config.getEsMapping();
|
|
|
+ Map<String, Object> esFieldData = new LinkedHashMap<>();
|
|
|
+
|
|
|
+ Object idVal = esTemplate.getESDataFromDmlData(mapping, data, old, esFieldData);
|
|
|
+
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Main table update ot es index, destination:{}, table: {}, index: {}, id: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ idVal);
|
|
|
+ }
|
|
|
+ boolean result = esTemplate.update(mapping, idVal, esFieldData);
|
|
|
+ if (!result) {
|
|
|
+ logger.error("Main table update to es index error, destination:{}, table: {}, index: {}, id: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ idVal);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 主表(单表)复杂字段update
|
|
|
+ *
|
|
|
+ * @param config es配置
|
|
|
+ * @param dml dml信息
|
|
|
+ * @param data 单行dml数据
|
|
|
+ */
|
|
|
+ private void mainTableUpdate(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old) {
|
|
|
+ ESMapping mapping = config.getEsMapping();
|
|
|
+ String sql = mapping.getSql();
|
|
|
+ String condition = ESSyncUtil.pkConditionSql(mapping, data);
|
|
|
+ sql = ESSyncUtil.appendCondition(sql, condition);
|
|
|
+ DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("Main table update ot es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ sql.replace("\n", " "));
|
|
|
+ }
|
|
|
+ ESSyncUtil.sqlRS(ds, sql, rs -> {
|
|
|
+ try {
|
|
|
+ while (rs.next()) {
|
|
|
+ Map<String, Object> esFieldData = new LinkedHashMap<>();
|
|
|
+ Object idVal = esTemplate.getESDataFromRS(mapping, rs, old, esFieldData);
|
|
|
+
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace(
|
|
|
+ "Main table update ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ idVal);
|
|
|
+ }
|
|
|
+ boolean result = esTemplate.update(mapping, idVal, esFieldData);
|
|
|
+ if (!result) {
|
|
|
+ logger.error(
|
|
|
+ "Main table update to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
|
|
|
+ config.getDestination(),
|
|
|
+ dml.getTable(),
|
|
|
+ mapping.get_index(),
|
|
|
+ idVal);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ });
|
|
|
+ }
|
|
|
+}
|