Browse Source

更新完成

mcy 6 years ago
parent
commit
81f403a9f2

+ 123 - 20
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -15,9 +15,9 @@ import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
-import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
 import com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil;
 import com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
@@ -120,7 +120,7 @@ public class ESSyncService {
                 // ------单表 & 所有字段都为简单字段------
                 // ------单表 & 所有字段都为简单字段------
                 singleTableSimpleFiledUpdate(config, dml, data, old);
                 singleTableSimpleFiledUpdate(config, dml, data, old);
             } else {
             } else {
-                // ------主表 查询sql来更新------
+                // ------主表 查询sql来更新------
                 if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
                 if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
                     ESMapping mapping = config.getEsMapping();
                     ESMapping mapping = config.getEsMapping();
                     String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
                     String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
@@ -148,6 +148,50 @@ public class ESSyncService {
                         mainTableUpdate(config, dml, data, old);
                         mainTableUpdate(config, dml, data, old);
                     }
                     }
                 }
                 }
+
+                // 从表的操作
+                for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
+                    if (tableItem.isMain()) {
+                        continue;
+                    }
+                    if (!tableItem.getTableName().equals(dml.getTable())) {
+                        continue;
+                    }
+
+                    // 关联条件出现在主表查询条件是否为简单字段
+                    boolean allFieldsSimple = true;
+                    for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                        if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
+                            allFieldsSimple = false;
+                            break;
+                        }
+                    }
+
+                    // 所有查询字段均为简单字段
+                    if (allFieldsSimple) {
+                        // 不是子查询
+                        if (!tableItem.isSubQuery()) {
+                            // ------关联表简单字段更新------
+                            Map<String, Object> esFieldData = new LinkedHashMap<>();
+                            for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                                if (old.containsKey(fieldItem.getColumn().getColumnName())) {
+                                    Object value = esTemplate.getValFromData(config.getEsMapping(),
+                                        data,
+                                        fieldItem.getFieldName(),
+                                        fieldItem.getColumn().getColumnName());
+                                    esFieldData.put(fieldItem.getFieldName(), value);
+                                }
+                            }
+                            joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
+                        } else {
+                            // ------关联子表简单字段更新------
+                            subTableSimpleFieldOperation(config, dml, data, old, tableItem);
+                        }
+                    } else {
+                        // ------关联子表复杂字段更新 执行全sql更新es------
+                        jonTableWholeSqlOperation(config, dml, data, old, tableItem);
+                    }
+                }
             }
             }
 
 
             i++;
             i++;
@@ -201,14 +245,23 @@ public class ESSyncService {
                         // 不是子查询
                         // 不是子查询
                         if (!tableItem.isSubQuery()) {
                         if (!tableItem.isSubQuery()) {
                             // ------关联表简单字段插入------
                             // ------关联表简单字段插入------
-                            joinTableSimpleFieldOperation(config, dml, data, tableItem);
+                            Map<String, Object> esFieldData = new LinkedHashMap<>();
+                            for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                                Object value = esTemplate.getValFromData(config.getEsMapping(),
+                                    data,
+                                    fieldItem.getFieldName(),
+                                    fieldItem.getColumn().getColumnName());
+                                esFieldData.put(fieldItem.getFieldName(), value);
+                            }
+
+                            joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
                         } else {
                         } else {
                             // ------关联子表简单字段插入------
                             // ------关联子表简单字段插入------
-                            subTableSimpleFieldOperation(config, dml, data, tableItem);
+                            subTableSimpleFieldOperation(config, dml, data, null, tableItem);
                         }
                         }
                     } else {
                     } else {
                         // ------关联子表复杂字段插入 执行全sql更新es------
                         // ------关联子表复杂字段插入 执行全sql更新es------
-                        jonTableWholeSqlOperation(config, dml, data, tableItem);
+                        jonTableWholeSqlOperation(config, dml, data, null, tableItem);
                     }
                     }
                 }
                 }
             }
             }
@@ -304,14 +357,8 @@ public class ESSyncService {
      * @param tableItem 当前表配置
      * @param tableItem 当前表配置
      */
      */
     private void joinTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
     private void joinTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
-                                               TableItem tableItem) {
+                                               TableItem tableItem, Map<String, Object> esFieldData) {
         ESMapping mapping = config.getEsMapping();
         ESMapping mapping = config.getEsMapping();
-        Map<String, Object> esFieldData = new LinkedHashMap<>();
-        for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
-            Object value = esTemplate
-                .getValFromData(mapping, data, fieldItem.getFieldName(), fieldItem.getColumn().getColumnName());
-            esFieldData.put(fieldItem.getFieldName(), value);
-        }
 
 
         BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
         BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
         for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
         for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields().entrySet()) {
@@ -353,10 +400,11 @@ public class ESSyncService {
      * @param config es配置
      * @param config es配置
      * @param dml dml信息
      * @param dml dml信息
      * @param data 单行dml数据
      * @param data 单行dml数据
+     * @param old 单行old数据
      * @param tableItem 当前表配置
      * @param tableItem 当前表配置
      */
      */
     private void subTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
     private void subTableSimpleFieldOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
-                                              TableItem tableItem) {
+                                              Map<String, Object> old, TableItem tableItem) {
         ESMapping mapping = config.getEsMapping();
         ESMapping mapping = config.getEsMapping();
         StringBuilder sql = new StringBuilder(
         StringBuilder sql = new StringBuilder(
             "SELECT * FROM (" + tableItem.getSubQuerySql() + ") " + tableItem.getAlias() + " WHERE ");
             "SELECT * FROM (" + tableItem.getSubQuerySql() + ") " + tableItem.getAlias() + " WHERE ");
@@ -396,10 +444,31 @@ public class ESSyncService {
             try {
             try {
                 while (rs.next()) {
                 while (rs.next()) {
                     Map<String, Object> esFieldData = new LinkedHashMap<>();
                     Map<String, Object> esFieldData = new LinkedHashMap<>();
+
                     for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                     for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
-                        Object val = esTemplate
-                            .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getColumn().getColumnName());
-                        esFieldData.put(fieldItem.getFieldName(), val);
+                        if (old != null) {
+                            out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {
+                                for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {
+                                    if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))
+                                        for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
+                                            if (old.containsKey(columnItem.getColumnName())) {
+                                                Object val = esTemplate.getValFromRS(mapping,
+                                                    rs,
+                                                    fieldItem.getFieldName(),
+                                                    fieldItem.getColumn().getColumnName());
+                                                esFieldData.put(fieldItem.getFieldName(), val);
+                                                break out;
+                                            }
+                                        }
+                                }
+                            }
+                        } else {
+                            Object val = esTemplate.getValFromRS(mapping,
+                                rs,
+                                fieldItem.getFieldName(),
+                                fieldItem.getColumn().getColumnName());
+                            esFieldData.put(fieldItem.getFieldName(), val);
+                        }
                     }
                     }
 
 
                     BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
                     BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
@@ -451,7 +520,7 @@ public class ESSyncService {
      * @param tableItem 当前表配置
      * @param tableItem 当前表配置
      */
      */
     private void jonTableWholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
     private void jonTableWholeSqlOperation(ESSyncConfig config, Dml dml, Map<String, Object> data,
-                                           TableItem tableItem) {
+                                           Map<String, Object> old, TableItem tableItem) {
         ESMapping mapping = config.getEsMapping();
         ESMapping mapping = config.getEsMapping();
         StringBuilder sql = new StringBuilder(mapping.getSql() + " WHERE ");
         StringBuilder sql = new StringBuilder(mapping.getSql() + " WHERE ");
 
 
@@ -491,9 +560,43 @@ public class ESSyncService {
                 while (rs.next()) {
                 while (rs.next()) {
                     Map<String, Object> esFieldData = new LinkedHashMap<>();
                     Map<String, Object> esFieldData = new LinkedHashMap<>();
                     for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                     for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
-                        Object val = esTemplate
-                            .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
-                        esFieldData.put(fieldItem.getFieldName(), val);
+                        if (old != null) {
+                            // 从表子查询
+                            out: for (FieldItem fieldItem1 : tableItem.getSubQueryFields()) {
+                                for (ColumnItem columnItem0 : fieldItem.getColumnItems()) {
+                                    if (fieldItem1.getFieldName().equals(columnItem0.getColumnName()))
+                                        for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
+                                            if (old.containsKey(columnItem.getColumnName())) {
+                                                Object val = esTemplate.getValFromRS(mapping,
+                                                    rs,
+                                                    fieldItem.getFieldName(),
+                                                    fieldItem.getFieldName());
+                                                esFieldData.put(fieldItem.getFieldName(), val);
+                                                break out;
+                                            }
+                                        }
+                                }
+                            }
+                            // 从表非子查询
+                            for (FieldItem fieldItem1 : tableItem.getRelationSelectFieldItems()) {
+                                if (fieldItem1.equals(fieldItem)) {
+                                    for (ColumnItem columnItem : fieldItem1.getColumnItems()) {
+                                        if (old.containsKey(columnItem.getColumnName())) {
+                                            Object val = esTemplate.getValFromRS(mapping,
+                                                rs,
+                                                fieldItem.getFieldName(),
+                                                fieldItem.getFieldName());
+                                            esFieldData.put(fieldItem.getFieldName(), val);
+                                            break;
+                                        }
+                                    }
+                                }
+                            }
+                        } else {
+                            Object val = esTemplate
+                                .getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
+                            esFieldData.put(fieldItem.getFieldName(), val);
+                        }
                     }
                     }
 
 
                     BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
                     BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();

+ 27 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOne2Test.java

@@ -49,6 +49,33 @@ public class RoleSyncJoinOne2Test {
         Assert.assertEquals("admin2_", response.getSource().get("_role_name"));
         Assert.assertEquals("admin2_", response.getSource().get("_role_name"));
     }
     }
 
 
+    @Test
+    public void updateTest02() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("UPDATE");
+        dml.setDatabase("mytest");
+        dml.setTable("role");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("role_name", "admin3");
+        dml.setData(dataList);
+
+        List<Map<String, Object>> oldList = new ArrayList<>();
+        Map<String, Object> old = new LinkedHashMap<>();
+        oldList.add(old);
+        old.put("role_name", "admin");
+        dml.setOld(oldList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("admin3_", response.getSource().get("_role_name"));
+    }
+
     @After
     @After
     public void after() {
     public void after() {
         esAdapter.destroy();
         esAdapter.destroy();

+ 27 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java

@@ -49,6 +49,33 @@ public class RoleSyncJoinOneTest {
         Assert.assertEquals("admin", response.getSource().get("_role_name"));
         Assert.assertEquals("admin", response.getSource().get("_role_name"));
     }
     }
 
 
+    @Test
+    public void updateTest02() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("UPDATE");
+        dml.setDatabase("mytest");
+        dml.setTable("role");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("role_name", "admin2");
+        dml.setData(dataList);
+
+        List<Map<String, Object>> oldList = new ArrayList<>();
+        Map<String, Object> old = new LinkedHashMap<>();
+        oldList.add(old);
+        old.put("role_name", "admin");
+        dml.setOld(oldList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("admin2", response.getSource().get("_role_name"));
+    }
+
     @After
     @After
     public void after() {
     public void after() {
         esAdapter.destroy();
         esAdapter.destroy();

+ 28 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinSub2Test.java

@@ -50,6 +50,34 @@ public class RoleSyncJoinSub2Test {
         Assert.assertEquals("a;b_", response.getSource().get("_labels"));
         Assert.assertEquals("a;b_", response.getSource().get("_labels"));
     }
     }
 
 
+    @Test
+    public void updateTest02() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("UPDATE");
+        dml.setDatabase("mytest");
+        dml.setTable("label");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("user_id",1L);
+        data.put("label", "aa");
+        dml.setData(dataList);
+
+        List<Map<String, Object>> oldList = new ArrayList<>();
+        Map<String, Object> old = new LinkedHashMap<>();
+        oldList.add(old);
+        old.put("label", "a");
+        dml.setOld(oldList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("aa;b_", response.getSource().get("_labels"));
+    }
+
     @After
     @After
     public void after() {
     public void after() {
         esAdapter.destroy();
         esAdapter.destroy();

+ 28 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinSubTest.java

@@ -50,6 +50,34 @@ public class RoleSyncJoinSubTest {
         Assert.assertEquals("a;b", response.getSource().get("_labels"));
         Assert.assertEquals("a;b", response.getSource().get("_labels"));
     }
     }
 
 
+    @Test
+    public void updateTest02() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("UPDATE");
+        dml.setDatabase("mytest");
+        dml.setTable("label");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("user_id",1L);
+        data.put("label", "aa");
+        dml.setData(dataList);
+
+        List<Map<String, Object>> oldList = new ArrayList<>();
+        Map<String, Object> old = new LinkedHashMap<>();
+        oldList.add(old);
+        old.put("label", "a");
+        dml.setOld(oldList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("aa;b", response.getSource().get("_labels"));
+    }
+
     @After
     @After
     public void after() {
     public void after() {
         esAdapter.destroy();
         esAdapter.destroy();

+ 12 - 8
client-adapter/launcher/src/main/resources/application-sample.yml

@@ -18,6 +18,10 @@ canal.conf:
   - instance: example
   - instance: example
     groups:
     groups:
     - outAdapters:
     - outAdapters:
+#      - name: es
+#        hosts: 127.0.0.1:9300
+#        properties:
+#          cluster.name: elasticsearch
       - name: logger
       - name: logger
 #      - name: hbase
 #      - name: hbase
 #        properties:
 #        properties:
@@ -39,11 +43,11 @@ canal.conf:
 #      outAdapters:
 #      outAdapters:
 #      - name: logger
 #      - name: logger
 #
 #
-#adapter.conf:
-#  datasourceConfigs:
-#    defaultDS:
-#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
-#      username: root
-#      password: 121212
-#  adapterConfigs:
-#  - hbase/mytest_person2.yml
+adapter.conf:
+  datasourceConfigs:
+    defaultDS:
+      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
+      username: root
+      password: 121212
+  adapterConfigs:
+  - hbase/mytest_person2.yml

+ 3 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -7,4 +7,6 @@ spring:
   jackson:
   jackson:
     date-format: yyyy-MM-dd HH:mm:ss
     date-format: yyyy-MM-dd HH:mm:ss
     time-zone: GMT+8
     time-zone: GMT+8
-    default-property-inclusion: non_null
+    default-property-inclusion: non_null
+  profiles:
+    active: sample