|
@@ -1,9 +1,6 @@
|
|
package com.alibaba.otter.canal.client.adapter.es.service;
|
|
package com.alibaba.otter.canal.client.adapter.es.service;
|
|
|
|
|
|
-import java.util.Collection;
|
|
|
|
-import java.util.LinkedHashMap;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Map;
|
|
|
|
|
|
+import java.util.*;
|
|
|
|
|
|
import javax.sql.DataSource;
|
|
import javax.sql.DataSource;
|
|
|
|
|
|
@@ -677,7 +674,17 @@ public class ESSyncService {
|
|
private void wholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old,
|
|
private void wholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old,
|
|
TableItem tableItem) {
|
|
TableItem tableItem) {
|
|
ESMapping mapping = config.getEsMapping();
|
|
ESMapping mapping = config.getEsMapping();
|
|
- StringBuilder sql = new StringBuilder(mapping.getSql() + " WHERE ");
|
|
|
|
|
|
+ //防止最后出现groupby 导致sql解析异常
|
|
|
|
+ String[] sqlSplit = mapping.getSql().split("GROUP\\ BY(?!(.*)ON)");
|
|
|
|
+ String sqlNoWhere = sqlSplit[0];
|
|
|
|
+
|
|
|
|
+ String sqlGroupBy = "";
|
|
|
|
+
|
|
|
|
+ if(sqlSplit.length > 1){
|
|
|
|
+ sqlGroupBy = "GROUP BY "+ sqlSplit[1];
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ StringBuilder sql = new StringBuilder(sqlNoWhere + " WHERE ");
|
|
|
|
|
|
for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
|
|
for (FieldItem fkFieldItem : tableItem.getRelationTableFields().keySet()) {
|
|
String columnName = fkFieldItem.getColumn().getColumnName();
|
|
String columnName = fkFieldItem.getColumn().getColumnName();
|
|
@@ -686,6 +693,8 @@ public class ESSyncService {
|
|
}
|
|
}
|
|
int len = sql.length();
|
|
int len = sql.length();
|
|
sql.delete(len - 5, len);
|
|
sql.delete(len - 5, len);
|
|
|
|
+ sql.append(sqlGroupBy);
|
|
|
|
+
|
|
DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
|
|
DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
|
|
if (logger.isTraceEnabled()) {
|
|
if (logger.isTraceEnabled()) {
|
|
logger.trace("Join table update es index by query whole sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
logger.trace("Join table update es index by query whole sql, destination:{}, table: {}, index: {}, sql: {}",
|