mcy 6 years ago
parent
commit
96b9484f70

+ 137 - 6
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -11,13 +11,13 @@ import org.elasticsearch.index.query.QueryBuilders;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
 import com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
@@ -35,7 +35,7 @@ public class ESSyncService {
 
     public void sync(Dml dml) {
         if (logger.isDebugEnabled()) {
-            logger.debug("DML: {}", JSON.toJSONString(dml));
+            logger.debug("DML: {}", dml.toString());
         }
         long begin = System.currentTimeMillis();
         String database = dml.getDatabase();
@@ -103,7 +103,55 @@ public class ESSyncService {
     }
 
     private void update(ESSyncConfig config, Dml dml) {
+        List<Map<String, Object>> dataList = dml.getData();
+        List<Map<String, Object>> oldList = dml.getOld();
+        if (dataList == null || dataList.isEmpty() || oldList == null || oldList.isEmpty()) {
+            return;
+        }
+        SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
+        int i = 0;
+        for (Map<String, Object> data : dataList) {
+            Map<String, Object> old = oldList.get(i);
+            if (data == null || data.isEmpty() || old == null || old.isEmpty()) {
+                continue;
+            }
+
+            if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
+                // ------单表 & 所有字段都为简单字段------
+                singleTableSimpleFiledUpdate(config, dml, data, old);
+            } else {
+                // ------是主表 查询sql来更新------
+                if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
+                    ESMapping mapping = config.getEsMapping();
+                    String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+                    FieldItem idFieldItem = schemaItem.getSelectFields().get(idFieldName);
+                    boolean idFieldSimple = true;
+                    if (idFieldItem.isMethod() || idFieldItem.isBinaryOp()) {
+                        idFieldSimple = false;
+                    }
 
+                    boolean allUpdateFieldSimple = true;
+                    out: for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+                        for (ColumnItem columnItem : fieldItem.getColumnItems()) {
+                            if (old.containsKey(columnItem.getColumnName())) {
+                                if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
+                                    allUpdateFieldSimple = false;
+                                    break out;
+                                }
+                            }
+                        }
+                    }
+                    // 判断主键和所更新的字段是否全为简单字段
+                    if (idFieldSimple && allUpdateFieldSimple) {
+                        singleTableSimpleFiledUpdate(config, dml, data, old);
+                    } else {
+                        mainTableUpdate(config, dml, data, old);
+                    }
+                }
+            }
+
+            i++;
+        }
     }
 
     /**
@@ -123,8 +171,8 @@ public class ESSyncService {
                 continue;
             }
 
-            // ------是否单表 & 所有字段都为简单字段------
             if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
+                // ------单表 & 所有字段都为简单字段------
                 singleTableSimpleFiledInsert(config, dml, data);
             } else {
                 // ------是主表 查询sql来插入------
@@ -210,7 +258,7 @@ public class ESSyncService {
         sql = ESSyncUtil.appendCondition(sql, condition);
         DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
         if (logger.isTraceEnabled()) {
-            logger.trace("Single table insert ot es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
+            logger.trace("Main table insert ot es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
                 config.getDestination(),
                 dml.getTable(),
                 mapping.get_index(),
@@ -224,7 +272,7 @@ public class ESSyncService {
 
                     if (logger.isTraceEnabled()) {
                         logger.trace(
-                            "Single table insert ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
+                            "Main table insert ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
                             config.getDestination(),
                             dml.getTable(),
                             mapping.get_index(),
@@ -233,7 +281,7 @@ public class ESSyncService {
                     boolean result = esTemplate.insert(config, idVal, esFieldData);
                     if (!result) {
                         logger.error(
-                            "Single table insert to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
+                            "Main table insert to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
                             config.getDestination(),
                             dml.getTable(),
                             mapping.get_index(),
@@ -486,4 +534,87 @@ public class ESSyncService {
             return 0;
         });
     }
+
+    /**
+     * 单表简单字段update
+     *
+     * @param config es配置
+     * @param dml dml信息
+     * @param data 单行data数据
+     * @param old 单行old数据
+     */
+    private void singleTableSimpleFiledUpdate(ESSyncConfig config, Dml dml, Map<String, Object> data,
+                                              Map<String, Object> old) {
+        ESMapping mapping = config.getEsMapping();
+        Map<String, Object> esFieldData = new LinkedHashMap<>();
+
+        Object idVal = esTemplate.getESDataFromDmlData(mapping, data, old, esFieldData);
+
+        if (logger.isTraceEnabled()) {
+            logger.trace("Single table update ot es index, destination:{}, table: {}, index: {}, id: {}",
+                config.getDestination(),
+                dml.getTable(),
+                mapping.get_index(),
+                idVal);
+        }
+        boolean result = esTemplate.update(config, idVal, esFieldData);
+        if (!result) {
+            logger.error("Single table update to es index error, destination:{}, table: {}, index: {}, id: {}",
+                config.getDestination(),
+                dml.getTable(),
+                mapping.get_index(),
+                idVal);
+        }
+    }
+
+    /**
+     * 主表(单表)复杂字段update
+     *
+     * @param config es配置
+     * @param dml dml信息
+     * @param data 单行dml数据
+     */
+    private void mainTableUpdate(ESSyncConfig config, Dml dml, Map<String, Object> data, Map<String, Object> old) {
+        ESMapping mapping = config.getEsMapping();
+        String sql = mapping.getSql();
+        String condition = ESSyncUtil.pkConditionSql(mapping, data);
+        sql = ESSyncUtil.appendCondition(sql, condition);
+        DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Main table update ot es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
+                config.getDestination(),
+                dml.getTable(),
+                mapping.get_index(),
+                sql.replace("\n", " "));
+        }
+        ESSyncUtil.sqlRS(ds, sql, rs -> {
+            try {
+                while (rs.next()) {
+                    Map<String, Object> esFieldData = new LinkedHashMap<>();
+                    Object idVal = esTemplate.getESDataFromRS(mapping, rs, old, esFieldData);
+
+                    if (logger.isTraceEnabled()) {
+                        logger.trace(
+                            "Main table update ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index(),
+                            idVal);
+                    }
+                    boolean result = esTemplate.update(config, idVal, esFieldData);
+                    if (!result) {
+                        logger.error(
+                            "Main table update to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index(),
+                            idVal);
+                    }
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return 0;
+        });
+    }
 }

+ 52 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -34,6 +34,7 @@ import org.springframework.util.CollectionUtils;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
 
 public class ESTemplate {
@@ -143,7 +144,7 @@ public class ESTemplate {
     }
 
     private boolean updateByQuery(ESMapping mapping, QueryBuilder queryBuilder, Map<String, Object> esFieldData,
-                                 int counter) {
+                                  int counter) {
         if (CollectionUtils.isEmpty(esFieldData)) {
             return true;
         }
@@ -307,6 +308,28 @@ public class ESTemplate {
         return resultIdVal;
     }
 
+    public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet, Map<String, Object> dmlOld,
+                                  Map<String, Object> esFieldData) throws SQLException {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
+            }
+
+            for (ColumnItem columnItem : fieldItem.getColumnItems()) {
+                if (dmlOld.get(columnItem.getColumnName()) != null
+                    && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                    esFieldData.put(fieldItem.getFieldName(),
+                        getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
+                    break;
+                }
+            }
+        }
+        return resultIdVal;
+    }
+
     public Object getValFromData(ESMapping mapping, Map<String, Object> dmlData, String fieldName, String columnName) {
         String esType = getEsType(mapping, fieldName);
         Object value = dmlData.get(columnName);
@@ -347,6 +370,34 @@ public class ESTemplate {
         return resultIdVal;
     }
 
+    /**
+     * 将dml的data, old转换为es的data
+     *
+     * @param mapping 配置mapping
+     * @param dmlData dml data
+     * @param esFieldData es data
+     * @return 返回 id 值
+     */
+    public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData, Map<String, Object> dmlOld,
+                                       Map<String, Object> esFieldData) {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
+
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName);
+            }
+
+            if (dmlOld.get(columnName) != null && !mapping.getSkips().contains(fieldItem.getFieldName())) {
+                esFieldData.put(fieldItem.getFieldName(),
+                    getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
+            }
+        }
+        return resultIdVal;
+    }
+
     /**
      * es 字段类型本地缓存
      */

+ 26 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java

@@ -51,6 +51,32 @@ public class UserSyncJoinOneTest {
         Assert.assertEquals("Eric_", response.getSource().get("_name"));
     }
 
+    @Test
+    public void updateTest02() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("UPDATE");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric");
+        dml.setData(dataList);
+        List<Map<String, Object>> oldList = new ArrayList<>();
+        Map<String, Object> old = new LinkedHashMap<>();
+        oldList.add(old);
+        old.put("name", "Eric2");
+        dml.setOld(oldList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("Eric_", response.getSource().get("_name"));
+    }
+
     @After
     public void after() {
         esAdapter.destroy();

+ 29 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java

@@ -54,6 +54,35 @@ public class UserSyncSingleTest {
         Assert.assertEquals("Eric", response.getSource().get("_name"));
     }
 
+    /**
+     * 单表更新
+     */
+    @Test
+    public void updateTest02() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("UPDATE");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric2");
+        dml.setData(dataList);
+        List<Map<String, Object>> oldList = new ArrayList<>();
+        Map<String, Object> old = new LinkedHashMap<>();
+        oldList.add(old);
+        old.put("name", "Eric");
+        dml.setOld(oldList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("Eric2", response.getSource().get("_name"));
+    }
+
     @Test
     public void ttt() {
         SearchResponse response = esAdapter.getTransportClient()

+ 2 - 4
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -1,16 +1,14 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
-import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.errors.WakeupException;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
 
 /**
  * kafka对应的client适配器工作线程

+ 5 - 5
client-adapter/launcher/src/main/resources/application.yml

@@ -27,11 +27,11 @@ canal.conf:
   - instance: example
     groups:
     - outAdapters:
-      - name: es
-        hosts: 127.0.0.1:9300
-        properties:
-          cluster.name: elasticsearch
-#      - name: logger
+#      - name: es
+#        hosts: 127.0.0.1:9300
+#        properties:
+#          cluster.name: elasticsearch
+      - name: logger
 #      - name: hbase
 #        properties:
 #          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}

+ 1 - 1
client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java

@@ -27,7 +27,7 @@ public class LoggerAdapterExample implements OuterAdapter {
 
     @Override
     public void sync(Dml dml) {
-        logger.info("DML: ", dml.toString());
+        logger.info("DML: {}", dml.toString());
     }
 
     @Override