mcy 6 anos atrás
pai
commit
14e094b522

+ 34 - 31
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java

@@ -23,7 +23,7 @@ public class SchemaItem {
         this.isAllFieldsSimple();
         aliasTableItems.values().forEach(tableItem -> {
             tableItem.getRelationTableFields();
-            tableItem.getRelationKeyFieldItems();
+            // tableItem.getRelationKeyFieldItems();
             tableItem.getRelationSelectFieldItems();
         });
     }
@@ -145,8 +145,9 @@ public class SchemaItem {
         private boolean                                  main;
         private boolean                                  subQuery;
 
-        private volatile Map<FieldItem, List<FieldItem>> relationTableFields;               // 当前表关联条件字段
-        private volatile List<FieldItem>                 relationKeyFieldItems;             // 关联条件字段在select中的对应字段
+        private volatile Map<FieldItem, List<FieldItem>> relationTableFields;               // 当前表关联条件字段对应主表查询字段
+        // private volatile List<FieldItem> relationKeyFieldItems; //
+        // 关联条件字段在select中的对应字段
         private volatile List<FieldItem>                 relationSelectFieldItems;          // 子表所在主表的查询字段
 
         public TableItem(SchemaItem schemaItem){
@@ -267,34 +268,36 @@ public class SchemaItem {
             return relationTableFields;
         }
 
-        public List<FieldItem> getRelationKeyFieldItems() {
-            if (relationKeyFieldItems == null) {
-                synchronized (SchemaItem.class) {
-                    if (relationKeyFieldItems == null) {
-                        relationKeyFieldItems = new ArrayList<>();
-                        getRelationFields().forEach(relationFieldsPair -> {
-                            FieldItem leftFieldItem = relationFieldsPair.getLeftFieldItem();
-                            List<FieldItem> selectFieldItem = getSchemaItem().getColumnFields()
-                                .get(leftFieldItem.getOwner() + "." + leftFieldItem.getColumn().getColumnName());
-                            if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
-                                relationKeyFieldItems.addAll(selectFieldItem);
-                            } else {
-                                FieldItem rightFieldItem = relationFieldsPair.getRightFieldItem();
-                                selectFieldItem = getSchemaItem().getColumnFields()
-                                    .get(rightFieldItem.getOwner() + "." + rightFieldItem.getColumn().getColumnName());
-                                if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
-                                    relationKeyFieldItems.addAll(selectFieldItem);
-                                } else {
-                                    throw new UnsupportedOperationException(
-                                        "Relation condition column must in select columns.");
-                                }
-                            }
-                        });
-                    }
-                }
-            }
-            return relationKeyFieldItems;
-        }
+        // public List<FieldItem> getRelationKeyFieldItems() {
+        // if (relationKeyFieldItems == null) {
+        // synchronized (SchemaItem.class) {
+        // if (relationKeyFieldItems == null) {
+        // relationKeyFieldItems = new ArrayList<>();
+        // getRelationFields().forEach(relationFieldsPair -> {
+        // FieldItem leftFieldItem = relationFieldsPair.getLeftFieldItem();
+        // List<FieldItem> selectFieldItem = getSchemaItem().getColumnFields()
+        // .get(leftFieldItem.getOwner() + "." +
+        // leftFieldItem.getColumn().getColumnName());
+        // if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
+        // relationKeyFieldItems.addAll(selectFieldItem);
+        // } else {
+        // FieldItem rightFieldItem = relationFieldsPair.getRightFieldItem();
+        // selectFieldItem = getSchemaItem().getColumnFields()
+        // .get(rightFieldItem.getOwner() + "." +
+        // rightFieldItem.getColumn().getColumnName());
+        // if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
+        // relationKeyFieldItems.addAll(selectFieldItem);
+        // } else {
+        // throw new UnsupportedOperationException(
+        // "Relation condition column must in select columns.");
+        // }
+        // }
+        // });
+        // }
+        // }
+        // }
+        // return relationKeyFieldItems;
+        // }
 
         public List<FieldItem> getRelationSelectFieldItems() {
             if (relationSelectFieldItems == null) {

+ 23 - 18
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -190,41 +190,46 @@ public class ESSyncService {
                 if (allFieldsSimple) {
                     // 不是子查询
                     if (!tableItem.isSubQuery()) {
-
                         Map<String, Object> esFieldData = new LinkedHashMap<>();
                         for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                             Object value = esTemplate
                                 .getValFromData(mapping, data, fieldItem.getColumn().getColumnName());
                             esFieldData.put(fieldItem.getFieldName(), value);
                         }
+
                         BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
                         for (Map.Entry<FieldItem, List<FieldItem>> entry : tableItem.getRelationTableFields()
                             .entrySet()) {
                             Object value = esTemplate
                                 .getValFromData(mapping, data, entry.getKey().getColumn().getColumnName());
                             for (FieldItem fieldItem : entry.getValue()) {
-                                queryBuilder.must(QueryBuilders.termsQuery(fieldItem.getFieldName(), value));
+                                String fieldName = fieldItem.getFieldName();
+                                // 判断是否是主键
+                                if (fieldName.equals(mapping.get_id())) {
+                                    fieldName = "_id";
+                                }
+                                queryBuilder.must(QueryBuilders.termsQuery(fieldName, value));
                             }
                         }
 
-                        //
-                        // for (FieldItem fieldItem : tableItem.getRelationTableFields()) {
-                        // Object value = esTemplate
-                        // .getValFromData(mapping, data, fieldItem.getColumn().getColumnName());
-                        //
-                        // }
-
-                        // if (logger.isDebugEnabled()) {
-                        // logger.debug("从表insert, 且均为简单字段,对es进行update_by_query, queryBuilder: {},
-                        // esFieldData:{}", queryBuilder.toString(), esFieldData);
-                        // }
+                        if (logger.isDebugEnabled()) {
+                            logger.trace(
+                                "Join table update es index by foreign key, destination:{}, table: {}, index: {}",
+                                config.getDestination(),
+                                dml.getTable(),
+                                mapping.get_index());
+                        }
                         boolean result = esTemplate.updateByQuery(mapping, queryBuilder, esFieldData);
-                        // if (!result) {
-                        // logger.error("从表insert,均为简单字段,直接对es进行update_by_query, es更新存在错误, table: {},
-                        // index: {}, dml: {}", dml.getTable(), config.getEsSyn().getIndex(), dml);
-                        // }
+                        if (!result) {
+                            logger.error(
+                                "Join table update es index by foreign key error, destination:{}, table: {}, index: {}",
+                                config.getDestination(),
+                                dml.getTable(),
+                                mapping.get_index());
+                        }
+                    } else {
+                        // TODO
                     }
-                    // TODO
                 } else {
                     // TODO 查询总sql
                 }

+ 2 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -264,7 +264,7 @@ public class ESTemplate {
                     if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
                         logger.warn(itemResponse.getFailureMessage());
                     } else {
-                        logger.error("ES 同步数据错误 {}", itemResponse.getFailureMessage());
+                        logger.error("ES sync commit error: {}", itemResponse.getFailureMessage());
                     }
                 }
             }
@@ -314,7 +314,7 @@ public class ESTemplate {
                 value = resultSet.getByte(fieldName);
             }
         }
-        return value;
+        return ESSyncUtil.typeConvert(value, esType);
     }
 
     public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,

+ 4 - 3
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/SqlParseTest.java

@@ -39,8 +39,9 @@ public class SqlParseTest {
             .forEach(fieldItem -> Assert.assertEquals("user_id", fieldItem.getColumn().getColumnName()));
 
         // 获取关联字段在select中的对应字段
-        List<FieldItem> relationSelectFieldItem = tableItem.getRelationKeyFieldItems();
-        relationSelectFieldItem.forEach(fieldItem -> Assert.assertEquals("c.labels",
-            fieldItem.getOwner() + "." + fieldItem.getColumn().getColumnName()));
+        // List<FieldItem> relationSelectFieldItem =
+        // tableItem.getRelationKeyFieldItems();
+        // relationSelectFieldItem.forEach(fieldItem -> Assert.assertEquals("c.labels",
+        // fieldItem.getOwner() + "." + fieldItem.getColumn().getColumnName()));
     }
 }

+ 17 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncTest.java

@@ -3,6 +3,9 @@ package com.alibaba.otter.canal.client.adapter.es.test.sync;
 import java.util.*;
 
 import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -53,6 +56,7 @@ public class UserSyncTest {
         dataList.add(data);
         data.put("id", 1L);
         data.put("name", "Eric");
+        data.put("role_id", 1L);
         data.put("c_time", new Date());
 
         dml.setData(dataList);
@@ -79,6 +83,7 @@ public class UserSyncTest {
         dataList.add(data);
         data.put("id", 1L);
         data.put("name", "Eric");
+        data.put("role_id", 1L);
         data.put("c_time", new Date());
 
         dml.setData(dataList);
@@ -108,6 +113,18 @@ public class UserSyncTest {
         esAdapter.getEsSyncService().sync(dml);
     }
 
+    @Test
+    public void ttt(){
+        SearchResponse response = esAdapter.getTransportClient().prepareSearch("mytest_user")
+                .setTypes("_doc")
+                .setQuery(QueryBuilders.termQuery("_id", 1L))
+                .setSize(100)
+                .get();
+        for (SearchHit hit : response.getHits()) {
+            System.out.println(hit.getSourceAsMap().get("name"));
+        }
+    }
+
     @After
     public void after() {
         esAdapter.destroy();

+ 13 - 0
client-adapter/elasticsearch/src/test/resources/log4j2-test.xml

@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="ERROR">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>

+ 17 - 17
client-adapter/launcher/src/main/resources/application.yml

@@ -16,34 +16,34 @@ hbase.zookeeper.property.clientPort: 2181
 hbase.zookeeper.znode.parent: /hbase
 
 canal.conf:
-#  canalServerHost: 127.0.0.1:11111
+  canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
-  bootstrapServers: slave1:6667 #or rocketmq nameservers:host1:9876;host2:9876
+#  bootstrapServers: slave1:6667 #or rocketmq nameservers:host1:9876;host2:9876
   flatMessage: true
   retry: 3
   timeout: 20000
   batchSize: 50
-#  canalInstances:
-#  - instance: example
-#    groups:
-#    - outAdapters:
-#      - name: es
-#        hosts: 127.0.0.1:9300
-#        properties:
-#          cluster.name: elasticsearch
+  canalInstances:
+  - instance: example
+    groups:
+    - outAdapters:
+      - name: es
+        hosts: 127.0.0.1:9300
+        properties:
+          cluster.name: elasticsearch
 #      - name: logger
 #      - name: hbase
 #        properties:
 #          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
 #          hbase.zookeeper.property.clientPort: ${hbase.zookeeper.property.clientPort}
 #          zookeeper.znode.parent: ${hbase.zookeeper.znode.parent}
-  mqTopics:
-  - mqMode: kafka
-    topic: example
-    groups:
-    - groupId: g2
-      outAdapters:
-      - name: logger
+#  mqTopics:
+#  - mqMode: kafka
+#    topic: example
+#    groups:
+#    - groupId: g2
+#      outAdapters:
+#      - name: logger
 #  mqTopics:
 #  - mqMode: rocketmq
 #    topic: example