浏览代码

fix bug #2023 and code optimize (#2167)

* 减少等待刷新状态时间

* fix bug #2023
rewerma 5 年之前
父节点
当前提交
31bb85bea3

+ 5 - 3
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalInstanceServiceImpl.java

@@ -109,13 +109,15 @@ public class CanalInstanceServiceImpl implements CanalInstanceService {
             }));
         }
 
-        futures.forEach(f -> {
+        for (Future<Void> f :futures){
             try {
                 f.get(3, TimeUnit.SECONDS);
-            } catch (TimeoutException | InterruptedException | ExecutionException e) {
+            } catch (InterruptedException | ExecutionException e) {
                 // ignore
+            } catch (TimeoutException e){
+                break;
             }
-        });
+        }
         return pager;
     }
 

+ 5 - 3
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/NodeServerServiceImpl.java

@@ -154,13 +154,15 @@ public class NodeServerServiceImpl implements NodeServerService {
                 return !status;
             }));
         }
-        futures.forEach(f -> {
+        for (Future<Boolean> f :futures){
             try {
                 f.get(3, TimeUnit.SECONDS);
-            } catch (TimeoutException | InterruptedException | ExecutionException e) {
+            } catch (InterruptedException | ExecutionException e) {
                 // ignore
+            } catch (TimeoutException e){
+                break;
             }
-        });
+        }
 
         return pager;
     }

+ 14 - 11
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -203,7 +203,7 @@ public class ESSyncService {
 
             if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
                 // ------单表 & 所有字段都为简单字段------
-                singleTableSimpleFiledUpdate(config, dml, data, old);
+                singleTableSimpleFiledUpdate(config, schemaItem.getMainTable().getAlias(), dml, data, old);
             } else {
                 // ------主表 查询sql来更新------
                 if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
@@ -219,10 +219,12 @@ public class ESSyncService {
                     boolean allUpdateFieldSimple = true;
                     out: for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
                         for (ColumnItem columnItem : fieldItem.getColumnItems()) {
-                            if (old.containsKey(columnItem.getColumnName())) {
-                                if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
-                                    allUpdateFieldSimple = false;
-                                    break out;
+                            if (schemaItem.getMainTable().getAlias().equals(columnItem.getOwner())) { // 如果是主表的字段
+                                if (old.containsKey(columnItem.getColumnName())) {
+                                    if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
+                                        allUpdateFieldSimple = false;
+                                        break out;
+                                    }
                                 }
                             }
                         }
@@ -257,7 +259,7 @@ public class ESSyncService {
 
                     // 判断主键和所更新的字段是否全为简单字段
                     if (idFieldSimple && allUpdateFieldSimple && !fkChanged) {
-                        singleTableSimpleFiledUpdate(config, dml, data, old);
+                        singleTableSimpleFiledUpdate(config,schemaItem.getMainTable().getAlias(), dml, data, old);
                     } else {
                         mainTableUpdate(config, dml, data, old);
                     }
@@ -674,14 +676,14 @@ public class ESSyncService {
     private void wholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old,
                                    TableItem tableItem) {
         ESMapping mapping = config.getEsMapping();
-        //防止最后出现groupby 导致sql解析异常
+        // 防止最后出现groupby 导致sql解析异常
         String[] sqlSplit = mapping.getSql().split("GROUP\\ BY(?!(.*)ON)");
         String sqlNoWhere = sqlSplit[0];
 
         String sqlGroupBy = "";
 
-        if(sqlSplit.length > 1){
-            sqlGroupBy =  "GROUP BY "+ sqlSplit[1];
+        if (sqlSplit.length > 1) {
+            sqlGroupBy = "GROUP BY " + sqlSplit[1];
         }
 
         StringBuilder sql = new StringBuilder(sqlNoWhere + " WHERE ");
@@ -781,16 +783,17 @@ public class ESSyncService {
      * 单表简单字段update
      *
      * @param config es配置
+     * @param owner 所属表
      * @param dml dml信息
      * @param data 单行data数据
      * @param old 单行old数据
      */
-    private void singleTableSimpleFiledUpdate(ESSyncConfig config, Dml dml, Map<String, Object> data,
+    private void singleTableSimpleFiledUpdate(ESSyncConfig config, String owner, Dml dml, Map<String, Object> data,
                                               Map<String, Object> old) {
         ESMapping mapping = config.getEsMapping();
         Map<String, Object> esFieldData = new LinkedHashMap<>();
 
-        Object idVal = esTemplate.getESDataFromDmlData(mapping, data, old, esFieldData);
+        Object idVal = esTemplate.getESDataFromDmlData(mapping, owner, data, old, esFieldData);
 
         if (logger.isTraceEnabled()) {
             logger.trace("Main table update to es index, destination:{}, table: {}, index: {}, id: {}",

+ 2 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESConnection.java

@@ -61,7 +61,8 @@ public class ESConnection {
         this.mode = mode;
         if (mode == ESClientMode.TRANSPORT) {
             Settings.Builder settingBuilder = Settings.builder();
-            properties.forEach(settingBuilder::put);
+            // properties.forEach(settingBuilder::put);
+            settingBuilder.put("cluster.name", properties.get("properties"));
             Settings settings = settingBuilder.build();
             transportClient = new PreBuiltTransportClient(settings);
             for (String host : hosts) {

+ 53 - 48
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -37,15 +37,15 @@ import com.alibaba.otter.canal.client.adapter.support.Util;
  */
 public class ESTemplate {
 
-    private static final Logger logger = LoggerFactory.getLogger(ESTemplate.class);
+    private static final Logger logger         = LoggerFactory.getLogger(ESTemplate.class);
 
-    private static final int MAX_BATCH_SIZE = 1000;
+    private static final int    MAX_BATCH_SIZE = 1000;
 
-    private ESConnection esConnection;
+    private ESConnection        esConnection;
 
-    private ESBulkRequest esBulkRequest;
+    private ESBulkRequest       esBulkRequest;
 
-    public ESTemplate(ESConnection esConnection) {
+    public ESTemplate(ESConnection esConnection){
         this.esConnection = esConnection;
         this.esBulkRequest = this.esConnection.new ESBulkRequest();
     }
@@ -61,8 +61,8 @@ public class ESTemplate {
     /**
      * 插入数据
      *
-     * @param mapping     配置对象
-     * @param pkVal       主键值
+     * @param mapping 配置对象
+     * @param pkVal 主键值
      * @param esFieldData 数据Map
      */
     public void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
@@ -70,16 +70,16 @@ public class ESTemplate {
             String parentVal = (String) esFieldData.remove("$parent_routing");
             if (mapping.isUpsert()) {
                 ESUpdateRequest updateRequest = esConnection.new ESUpdateRequest(mapping.get_index(),
-                        mapping.get_type(),
-                        pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
+                    mapping.get_type(),
+                    pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
                 if (StringUtils.isNotEmpty(parentVal)) {
                     updateRequest.setRouting(parentVal);
                 }
                 getBulk().add(updateRequest);
             } else {
                 ESIndexRequest indexRequest = esConnection.new ESIndexRequest(mapping.get_index(),
-                        mapping.get_type(),
-                        pkVal.toString()).setSource(esFieldData);
+                    mapping.get_type(),
+                    pkVal.toString()).setSource(esFieldData);
                 if (StringUtils.isNotEmpty(parentVal)) {
                     indexRequest.setRouting(parentVal);
                 }
@@ -88,13 +88,13 @@ public class ESTemplate {
             commitBulk();
         } else {
             ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index(),
-                    mapping.get_type()).setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal)).size(10000);
+                mapping.get_type()).setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal)).size(10000);
             SearchResponse response = esSearchRequest.getResponse();
 
             for (SearchHit hit : response.getHits()) {
                 ESUpdateRequest esUpdateRequest = this.esConnection.new ESUpdateRequest(mapping.get_index(),
-                        mapping.get_type(),
-                        hit.getId()).setDoc(esFieldData);
+                    mapping.get_type(),
+                    hit.getId()).setDoc(esFieldData);
                 getBulk().add(esUpdateRequest);
                 commitBulk();
             }
@@ -105,8 +105,8 @@ public class ESTemplate {
     /**
      * 根据主键更新数据
      *
-     * @param mapping     配置对象
-     * @param pkVal       主键值
+     * @param mapping 配置对象
+     * @param pkVal 主键值
      * @param esFieldData 数据Map
      */
     public void update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
@@ -119,8 +119,8 @@ public class ESTemplate {
     /**
      * update by query
      *
-     * @param config      配置对象
-     * @param paramsTmp   sql查询条件
+     * @param config 配置对象
+     * @param paramsTmp sql查询条件
      * @param esFieldData 数据Map
      */
     public void updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {
@@ -164,25 +164,25 @@ public class ESTemplate {
     /**
      * 通过主键删除数据
      *
-     * @param mapping     配置对象
-     * @param pkVal       主键值
+     * @param mapping 配置对象
+     * @param pkVal 主键值
      * @param esFieldData 数据Map
      */
     public void delete(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
             ESDeleteRequest esDeleteRequest = this.esConnection.new ESDeleteRequest(mapping.get_index(),
-                    mapping.get_type(),
-                    pkVal.toString());
+                mapping.get_type(),
+                pkVal.toString());
             getBulk().add(esDeleteRequest);
             commitBulk();
         } else {
             ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index(),
-                    mapping.get_type()).setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal)).size(10000);
+                mapping.get_type()).setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal)).size(10000);
             SearchResponse response = esSearchRequest.getResponse();
             for (SearchHit hit : response.getHits()) {
                 ESUpdateRequest esUpdateRequest = this.esConnection.new ESUpdateRequest(mapping.get_index(),
-                        mapping.get_type(),
-                        hit.getId()).setDoc(esFieldData);
+                    mapping.get_type(),
+                    hit.getId()).setDoc(esFieldData);
                 getBulk().add(esUpdateRequest);
                 commitBulk();
             }
@@ -227,16 +227,16 @@ public class ESTemplate {
             String parentVal = (String) esFieldData.remove("$parent_routing");
             if (mapping.isUpsert()) {
                 ESUpdateRequest esUpdateRequest = this.esConnection.new ESUpdateRequest(mapping.get_index(),
-                        mapping.get_type(),
-                        pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
+                    mapping.get_type(),
+                    pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
                 if (StringUtils.isNotEmpty(parentVal)) {
                     esUpdateRequest.setRouting(parentVal);
                 }
                 getBulk().add(esUpdateRequest);
             } else {
                 ESUpdateRequest esUpdateRequest = this.esConnection.new ESUpdateRequest(mapping.get_index(),
-                        mapping.get_type(),
-                        pkVal.toString()).setDoc(esFieldData);
+                    mapping.get_type(),
+                    pkVal.toString()).setDoc(esFieldData);
                 if (StringUtils.isNotEmpty(parentVal)) {
                     esUpdateRequest.setRouting(parentVal);
                 }
@@ -244,12 +244,12 @@ public class ESTemplate {
             }
         } else {
             ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index(),
-                    mapping.get_type()).setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal)).size(10000);
+                mapping.get_type()).setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal)).size(10000);
             SearchResponse response = esSearchRequest.getResponse();
             for (SearchHit hit : response.getHits()) {
                 ESUpdateRequest esUpdateRequest = this.esConnection.new ESUpdateRequest(mapping.get_index(),
-                        mapping.get_type(),
-                        hit.getId()).setDoc(esFieldData);
+                    mapping.get_type(),
+                    hit.getId()).setDoc(esFieldData);
                 getBulk().add(esUpdateRequest);
             }
         }
@@ -289,7 +289,7 @@ public class ESTemplate {
             }
 
             if (!fieldItem.getFieldName().equals(mapping.get_id())
-                    && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                && !mapping.getSkips().contains(fieldItem.getFieldName())) {
                 esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
             }
         }
@@ -327,9 +327,9 @@ public class ESTemplate {
 
             for (ColumnItem columnItem : fieldItem.getColumnItems()) {
                 if (dmlOld.containsKey(columnItem.getColumnName())
-                        && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                    && !mapping.getSkips().contains(fieldItem.getFieldName())) {
                     esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
-                            getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
+                        getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
                     break;
                 }
             }
@@ -361,8 +361,8 @@ public class ESTemplate {
     /**
      * 将dml的data转换为es的data
      *
-     * @param mapping     配置mapping
-     * @param dmlData     dml data
+     * @param mapping 配置mapping
+     * @param dmlData dml data
      * @param esFieldData es data
      * @return 返回 id 值
      */
@@ -380,7 +380,7 @@ public class ESTemplate {
             }
 
             if (!fieldItem.getFieldName().equals(mapping.get_id())
-                    && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                && !mapping.getSkips().contains(fieldItem.getFieldName())) {
                 esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
             }
         }
@@ -393,18 +393,23 @@ public class ESTemplate {
     /**
      * 将dml的data, old转换为es的data
      *
-     * @param mapping     配置mapping
-     * @param dmlData     dml data
+     * @param mapping 配置mapping
+     * @param owner 所属表
+     * @param dmlData dml data
      * @param esFieldData es data
      * @return 返回 id 值
      */
-    public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData, Map<String, Object> dmlOld,
-                                       Map<String, Object> esFieldData) {
+    public Object getESDataFromDmlData(ESMapping mapping, String owner, Map<String, Object> dmlData,
+                                       Map<String, Object> dmlOld, Map<String, Object> esFieldData) {
         SchemaItem schemaItem = mapping.getSchemaItem();
         String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
         Object resultIdVal = null;
         for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
-            String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
+            ColumnItem columnItem = fieldItem.getColumnItems().iterator().next();
+            if (!columnItem.getOwner().equals(owner)) {
+                continue;
+            }
+            String columnName = columnItem.getColumnName();
 
             if (fieldItem.getFieldName().equals(idFieldName)) {
                 resultIdVal = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
@@ -412,7 +417,7 @@ public class ESTemplate {
 
             if (dmlOld.containsKey(columnName) && !mapping.getSkips().contains(fieldItem.getFieldName())) {
                 esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
-                        getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
+                    getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
             }
         }
 
@@ -433,9 +438,9 @@ public class ESTemplate {
                     Object parentVal;
                     try {
                         parentVal = getValFromRS(mapping,
-                                resultSet,
-                                parentFieldItem.getFieldName(),
-                                parentFieldItem.getFieldName());
+                            resultSet,
+                            parentFieldItem.getFieldName(),
+                            parentFieldItem.getFieldName());
                     } catch (SQLException e) {
                         throw new RuntimeException(e);
                     }
@@ -480,7 +485,7 @@ public class ESTemplate {
     /**
      * 获取es mapping中的属性类型
      *
-     * @param mapping   mapping配置
+     * @param mapping mapping配置
      * @param fieldName 属性名
      * @return 类型
      */