Browse Source

ES子表含group的where条件拼接 (#2190)

* ES子表含group的where条件拼接

* modify
rewerma 5 years ago
parent
commit
5214257cd4

+ 48 - 7
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SqlParser.java

@@ -10,11 +10,7 @@ import java.util.stream.Collectors;
 import com.alibaba.fastsql.sql.SQLUtils;
 import com.alibaba.fastsql.sql.ast.SQLExpr;
 import com.alibaba.fastsql.sql.ast.expr.*;
-import com.alibaba.fastsql.sql.ast.statement.SQLExprTableSource;
-import com.alibaba.fastsql.sql.ast.statement.SQLJoinTableSource;
-import com.alibaba.fastsql.sql.ast.statement.SQLSelectStatement;
-import com.alibaba.fastsql.sql.ast.statement.SQLSubqueryTableSource;
-import com.alibaba.fastsql.sql.ast.statement.SQLTableSource;
+import com.alibaba.fastsql.sql.ast.statement.*;
 import com.alibaba.fastsql.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
 import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlStatementParser;
 import com.alibaba.fastsql.sql.parser.ParserException;
@@ -125,8 +121,8 @@ public class SqlParser {
         } else if (expr instanceof SQLCaseExpr) {
             SQLCaseExpr sqlCaseExpr = (SQLCaseExpr) expr;
             fieldItem.setMethod(true);
-            sqlCaseExpr.getItems().forEach(item-> visitColumn(item.getConditionExpr(), fieldItem));
-        }else if(expr instanceof SQLCharExpr) {
+            sqlCaseExpr.getItems().forEach(item -> visitColumn(item.getConditionExpr(), fieldItem));
+        } else if (expr instanceof SQLCharExpr) {
             SQLCharExpr sqlCharExpr = (SQLCharExpr) expr;
             String owner = null;
             String columnName = null;
@@ -232,4 +228,49 @@ public class SqlParser {
             throw new UnsupportedOperationException("Unsupported for complex of on-condition");
         }
     }
+
+    public static MySqlSelectQueryBlock parseSQLSelectQueryBlock(String sql) {
+        if (sql == null || "".equals(sql)) {
+            return null;
+        }
+        SQLStatementParser parser = new MySqlStatementParser(sql);
+        SQLSelectStatement statement = (SQLSelectStatement) parser.parseStatement();
+        return (MySqlSelectQueryBlock) statement.getSelect().getQuery();
+    }
+
+    public static String parse4SQLSelectItem(MySqlSelectQueryBlock sqlSelectQueryBlock) {
+        List<SQLSelectItem> selectItems = sqlSelectQueryBlock.getSelectList();
+        StringBuilder subSql = new StringBuilder();
+        int i = 0;
+        for (SQLSelectItem sqlSelectItem : selectItems) {
+            if (i != 0) {
+                subSql.append(",");
+            } else {
+                i++;
+            }
+            subSql.append(SQLUtils.toMySqlString(sqlSelectItem));
+        }
+        return subSql.toString();
+    }
+
+    public static String parse4FromTableSource(MySqlSelectQueryBlock sqlSelectQueryBlock) {
+        SQLTableSource sqlTableSource = sqlSelectQueryBlock.getFrom();
+        return SQLUtils.toMySqlString(sqlTableSource);
+    }
+
+    public static String parse4WhereItem(MySqlSelectQueryBlock sqlSelectQueryBlock) {
+        SQLExpr sqlExpr = sqlSelectQueryBlock.getWhere();
+        if (sqlExpr != null) {
+            return SQLUtils.toMySqlString(sqlExpr);
+        }
+        return null;
+    }
+
+    public static String parse4GroupBy(MySqlSelectQueryBlock sqlSelectQueryBlock) {
+        SQLSelectGroupByClause expr = sqlSelectQueryBlock.getGroupBy();
+        if (expr != null) {
+            return SQLUtils.toMySqlString(expr);
+        }
+        return null;
+    }
 }

+ 27 - 7
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -4,6 +4,8 @@ import java.util.*;
 
 import javax.sql.DataSource;
 
+import com.alibaba.fastsql.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
+import com.alibaba.otter.canal.client.adapter.es.config.SqlParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -259,7 +261,7 @@ public class ESSyncService {
 
                     // 判断主键和所更新的字段是否全为简单字段
                     if (idFieldSimple && allUpdateFieldSimple && !fkChanged) {
-                        singleTableSimpleFiledUpdate(config,schemaItem.getMainTable().getAlias(), dml, data, old);
+                        singleTableSimpleFiledUpdate(config, schemaItem.getMainTable().getAlias(), dml, data, old);
                     } else {
                         mainTableUpdate(config, dml, data, old);
                     }
@@ -583,16 +585,34 @@ public class ESSyncService {
     private void subTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
                                               Map<String, Object> old, TableItem tableItem) {
         ESMapping mapping = config.getEsMapping();
-        StringBuilder sql = new StringBuilder(
-            "SELECT * FROM (" + tableItem.getSubQuerySql() + ") " + tableItem.getAlias() + " WHERE ");
 
+        MySqlSelectQueryBlock queryBlock = SqlParser.parseSQLSelectQueryBlock(tableItem.getSubQuerySql());
+        StringBuilder sql = new StringBuilder();
+        sql.append("SELECT ")
+            .append(SqlParser.parse4SQLSelectItem(queryBlock))
+            .append(" FROM ")
+            .append(SqlParser.parse4FromTableSource(queryBlock));
+
+        String whereSql = SqlParser.parse4WhereItem(queryBlock);
+        if (whereSql != null) {
+            sql.append(" WHERE ").append(whereSql);
+        } else {
+            sql.append(" WHERE 1=1 ");
+        }
+
+        List<Object> values = new ArrayList<>();
         for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
             String columnName = fkFieldItem.getColumn().getColumnName();
             Object value = esTemplate.getValFromData(mapping, data, fkFieldItem.getFieldName(), columnName);
-            ESSyncUtil.appendCondition(sql, value, tableItem.getAlias(), columnName);
+            sql.append(" AND ").append(columnName).append("=? ");
+            values.add(value);
         }
-        int len = sql.length();
-        sql.delete(len - 5, len);
+
+        String groupSql = SqlParser.parse4GroupBy(queryBlock);
+        if (groupSql != null) {
+            sql.append(groupSql);
+        }
+
         DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
         if (logger.isTraceEnabled()) {
             logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
@@ -601,7 +621,7 @@ public class ESSyncService {
                 mapping.get_index(),
                 sql.toString().replace("\n", " "));
         }
-        Util.sqlRS(ds, sql.toString(), rs -> {
+        Util.sqlRS(ds, sql.toString(), values, rs -> {
             try {
                 while (rs.next()) {
                     Map<String, Object> esFieldData = new LinkedHashMap<>();