Ver código fonte

fix bugs (#1712)

* kafka 模式默认开启事务异步发送

* fix adapter etl rest sql inject bug

* fix #1704
rewerma 6 anos atrás
pai
commit
2b738f4e04

+ 8 - 6
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -2,9 +2,7 @@ package com.alibaba.otter.canal.client.adapter.es.support;
 
 
 import java.sql.ResultSet;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
 
 
@@ -136,11 +134,15 @@ public class ESTemplate {
         // 查询sql批量更新
         // 查询sql批量更新
         DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
         DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
         StringBuilder sql = new StringBuilder("SELECT * FROM (" + mapping.getSql() + ") _v WHERE ");
         StringBuilder sql = new StringBuilder("SELECT * FROM (" + mapping.getSql() + ") _v WHERE ");
-        paramsTmp.forEach(
-            (fieldName, value) -> sql.append("_v.").append(fieldName).append("=").append(value).append(" AND "));
+        List<Object> values = new ArrayList<>();
+        paramsTmp.forEach((fieldName, value) -> {
+            sql.append("_v.").append(fieldName).append("=? AND ");
+            values.add(value);
+        });
+        //TODO 直接外部包裹sql会导致全表扫描性能低, 待优化拼接内部where条件
         int len = sql.length();
         int len = sql.length();
         sql.delete(len - 4, len);
         sql.delete(len - 4, len);
-        Integer syncCount = (Integer) Util.sqlRS(ds, sql.toString(), rs -> {
+        Integer syncCount = (Integer) Util.sqlRS(ds, sql.toString(), values, rs -> {
             int count = 0;
             int count = 0;
             try {
             try {
                 while (rs.next()) {
                 while (rs.next()) {