Browse Source

单表新增

mcy 6 years ago
parent
commit
2c9acb655d
16 changed files with 723 additions and 65 deletions
  1. 9 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java
  2. 10 0
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java
  3. 1 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java
  4. 29 0
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java
  5. 170 2
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  6. 293 0
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java
  7. 83 13
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java
  8. 18 0
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/Util.java
  9. 0 12
      client-adapter/elasticsearch/src/main/resources/es/myetst_user.yml
  10. 14 0
      client-adapter/elasticsearch/src/main/resources/es/mytest_user.yml
  11. 3 4
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ConfigLoadTest.java
  12. 54 0
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/SyncTest.java
  13. 2 2
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java
  14. 10 9
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java
  15. 1 1
      client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml
  16. 26 20
      client-adapter/launcher/src/main/resources/application.yml

+ 9 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -1,16 +1,17 @@
 package com.alibaba.otter.canal.client.adapter.es;
 
+import java.net.InetAddress;
 import java.util.List;
 import java.util.Map;
 
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
 import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
@@ -44,6 +45,12 @@ public class ESAdapter implements OuterAdapter {
             properties.forEach(settingBuilder::put);
             Settings settings = settingBuilder.build();
             transportClient = new PreBuiltTransportClient(settings);
+            String[] hostArray = configuration.getHosts().split(",");
+            for (String host : hostArray) {
+                int i = host.indexOf(":");
+                transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
+                    Integer.parseInt(host.substring(i + 1))));
+            }
             ESTemplate esTemplate = new ESTemplate(transportClient);
             esSyncService = new ESSyncService(esTemplate);
         } catch (Exception e) {
@@ -53,6 +60,7 @@ public class ESAdapter implements OuterAdapter {
 
     @Override
     public void sync(Dml dml) {
+        esSyncService.sync(dml);
     }
 
     @Override

+ 10 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -7,6 +7,8 @@ public class ESSyncConfig {
 
     private String    dataSourceKey; // 数据源key
 
+    private String    destination;   // canal destination
+
     private ESMapping esMapping;
 
     public void validate() {
@@ -32,6 +34,14 @@ public class ESSyncConfig {
         this.dataSourceKey = dataSourceKey;
     }
 
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
     public ESMapping getEsMapping() {
         return esMapping;
     }

+ 1 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -68,7 +68,7 @@ public class ESSyncConfigLoader {
                 Pattern pattern = Pattern.compile(".*:(.*)://.*/(.*)\\?.*$");
                 Matcher matcher = pattern.matcher(dataSource.getUrl());
                 if (!matcher.find()){
-                    throw new RuntimeException("No found the schema of jdbc-url: " + config.getDataSourceKey());
+                    throw new RuntimeException("Not found the schema of jdbc-url: " + config.getDataSourceKey());
                 }
                 String schema = matcher.group(2);
 

+ 29 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java

@@ -4,6 +4,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 
 public class SchemaItem {
 
@@ -13,10 +14,12 @@ public class SchemaItem {
 
     private volatile Map<String, List<TableItem>> tableItemAliases;
     private volatile Map<String, List<FieldItem>> columnFields;
+    private volatile Boolean                      allFieldsSimple;
 
     public void init() {
         this.getTableItemAliases();
         this.getColumnFields();
+        this.isAllFieldsSimple();
         aliasTableItems.values().forEach(tableItem -> {
             tableItem.getRelationTableFields();
             tableItem.getRelationSelectFields();
@@ -89,7 +92,25 @@ public class SchemaItem {
             }
         }
         return columnFields;
+    }
 
+    public boolean isAllFieldsSimple() {
+        if (allFieldsSimple == null) {
+            synchronized (SchemaItem.class) {
+                if (allFieldsSimple == null) {
+                    allFieldsSimple = true;
+
+                    for (FieldItem fieldItem : getSelectFields().values()) {
+                        if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
+                            allFieldsSimple = false;
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        return allFieldsSimple;
     }
 
     public TableItem getMainTable() {
@@ -100,6 +121,14 @@ public class SchemaItem {
         }
     }
 
+    public FieldItem getIdFieldItem(ESMapping mapping) {
+        if (mapping.get_id() != null) {
+            return getSelectFields().get(mapping.get_id());
+        } else {
+            return getSelectFields().get(mapping.getPk());
+        }
+    }
+
     public static class TableItem {
 
         private SchemaItem               schemaItem;

+ 170 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -1,10 +1,26 @@
 package com.alibaba.otter.canal.client.adapter.es.service;
 
-import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
-import org.elasticsearch.client.transport.TransportClient;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
+import com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil;
+import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
+import com.alibaba.otter.canal.client.adapter.es.support.Util;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
 public class ESSyncService {
 
     private static Logger logger = LoggerFactory.getLogger(ESSyncService.class);
@@ -14,4 +30,156 @@ public class ESSyncService {
     public ESSyncService(ESTemplate esTemplate){
         this.esTemplate = esTemplate;
     }
+
+    public void sync(Dml dml) {
+        long begin = System.currentTimeMillis();
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = ESSyncConfigLoader.getDbTableEsSyncConfig().get(database + "-" + table);
+        if (esSyncConfigs != null) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Destination: {}, database:{}, table:{}, type:{}, effect index count: {}",
+                    dml.getDestination(),
+                    dml.getDatabase(),
+                    dml.getTable(),
+                    dml.getType(),
+                    esSyncConfigs.size());
+                logger.debug(JSON.toJSONString(dml));
+            }
+
+            for (ESSyncConfig config : esSyncConfigs) {
+                logger.info("Prepared to sync index: {}, destination: {}",
+                    config.getEsMapping().get_index(),
+                    dml.getDestination());
+                this.sync(config, dml);
+                logger.info("Sync completed: {}, destination: {}",
+                    config.getEsMapping().get_index(),
+                    dml.getDestination());
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("Sync elapsed time: {}, effect index count:{}, destination: {}",
+                    (System.currentTimeMillis() - begin),
+                    esSyncConfigs.size(),
+                    dml.getDestination());
+            }
+        }
+    }
+
+    public void sync(ESSyncConfig config, Dml dml) {
+        try {
+            long begin = System.currentTimeMillis();
+
+            String type = dml.getType();
+            if (type != null && type.equalsIgnoreCase("INSERT")) {
+                insert(config, dml);
+            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                // update(config, dml);
+            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                // delete(config, dml);
+            }
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Sync elapsed time: {},destination: {}, es index: {}",
+                    (System.currentTimeMillis() - begin),
+                    dml.getDestination(),
+                    config.getEsMapping().get_index());
+            }
+        } catch (Exception e) {
+            logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private void insert(ESSyncConfig config, Dml dml) {
+        List<Map<String, Object>> dataList = dml.getData();
+        if (dataList == null || dataList.isEmpty()) {
+            return;
+        }
+        ESMapping mapping = config.getEsMapping();
+        SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
+        for (Map<String, Object> data : dataList) {
+            if (data == null || data.isEmpty()) {
+                continue;
+            }
+
+            // ------是否单表 & 所有字段都为简单字段------
+            if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
+                Map<String, Object> esFieldData = new HashMap<>();
+
+                // ------通过dml对象插入ES------
+                // 获取所有字段
+                for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+                    // 如果是主键字段则不插入
+                    if (fieldItem.getFieldName().equals(mapping.get_id())) {
+                        continue;
+                    }
+                    Object value = esTemplate.getDataValue(mapping,
+                        data,
+                        Util.cleanColumn(fieldItem.getColumnItems().iterator().next().getColumnName()));
+                    String fieldName = Util.cleanColumn(fieldItem.getFieldName());
+                    if (!mapping.getSkips().contains(fieldName)) {
+                        esFieldData.put(fieldName, esTemplate.convertType(mapping, fieldName, value));
+                    }
+                }
+
+                // 取出id值
+                Object idVal;
+                if (mapping.get_id() != null) {
+                    idVal = esTemplate.getDataValue(mapping,
+                        data,
+                        Util.cleanColumn(
+                            schemaItem.getSelectFields().get(mapping.get_id()).getColumn().getColumnName()));
+                } else {
+                    idVal = esTemplate.getDataValue(mapping,
+                        data,
+                        Util.cleanColumn(
+                            schemaItem.getSelectFields().get(mapping.getPk()).getColumn().getColumnName()));
+                }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Single table insert ot es index, destination:{}, table: {}, index: {}, id: {}",
+                        config.getDestination(),
+                        dml.getTable(),
+                        mapping.get_index(),
+                        idVal);
+                }
+                boolean result = esTemplate.insert(config, idVal, esFieldData);
+                if (!result) {
+                    logger.error("Single table insert to es index error, destination:{}, table: {}, index: {}, id: {}",
+                        config.getDestination(),
+                        dml.getTable(),
+                        mapping.get_index(),
+                        idVal);
+                }
+                return; // 单表插入完成返回
+            }
+
+            // ------是否主表------
+            if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
+                String sql = mapping.getSql();
+                String condition = ESSyncUtil.pkConditaionSql(mapping, data);
+                sql = ESSyncUtil.appendCondition(sql, condition);
+                DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+//                ESSyncUtil.sqlRS(ds, sql, rs -> {
+//                    try {
+//                        while (rs.next()) {
+//                            Map<String, Object> esFieldData = new HashMap<>();
+//
+//                            Object idVal = ESSyncUtil.persistEsByRs(transportClient, config, rs, mainTable, esFieldData);
+//
+//                            if (logger.isDebugEnabled()) {
+//                                logger.debug("主表insert, 通过执行sql查询相应记录,并insert到Es, id: {}, esFieldValue: {}", idVal, esFieldData);
+//                            }
+//                            boolean result = ESSyncUtil.persist(transportClient, config, idVal, esFieldData, OpType.INSERT);
+//                            if (!result) {
+//                                logger.error("主表insert,通过执行sql查询响应记录,并insert到es, es插入失败,table: {}, index: {}", dml.getTable(), config.getEsSyn().getIndex());
+//                            }
+//                        }
+//                    } catch (Exception e) {
+//                        throw new RuntimeException(e);
+//                    }
+//                    return 0;
+//                });
+            }
+        }
+    }
 }

+ 293 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java

@@ -0,0 +1,293 @@
+package com.alibaba.otter.canal.client.adapter.es.support;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.sql.*;
+import java.util.*;
+import java.util.Date;
+import java.util.function.Function;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.codec.binary.Base64;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
+
+public class ESSyncUtil {
+
+    private static Logger logger = LoggerFactory.getLogger(ESSyncUtil.class);
+
+    /**
+     * 类型转换为Mapping中对应的类型
+     */
+    public static Object typeConvert(Object val, String esType) {
+        if (val == null) {
+            return null;
+        }
+        if (esType == null) {
+            return val;
+        }
+        Object res = null;
+        if ("integer".equals(esType)) {
+            if (val instanceof Number) {
+                res = ((Number) val).intValue();
+            } else {
+                res = Integer.parseInt(val.toString());
+            }
+        } else if ("long".equals(esType)) {
+            if (val instanceof Number) {
+                res = ((Number) val).longValue();
+            } else {
+                res = Long.parseLong(val.toString());
+            }
+        } else if ("short".equals(esType)) {
+            if (val instanceof Number) {
+                res = ((Number) val).shortValue();
+            } else {
+                res = Short.parseShort(val.toString());
+            }
+        } else if ("byte".equals(esType)) {
+            if (val instanceof Number) {
+                res = ((Number) val).byteValue();
+            } else {
+                res = Byte.parseByte(val.toString());
+            }
+        } else if ("double".equals(esType)) {
+            if (val instanceof Number) {
+                res = ((Number) val).doubleValue();
+            } else {
+                res = Double.parseDouble(val.toString());
+            }
+        } else if ("float".equals(esType) || "half_float".equals(esType) || "scaled_float".equals(esType)) {
+            if (val instanceof Number) {
+                res = ((Number) val).floatValue();
+            } else {
+                res = Float.parseFloat(val.toString());
+            }
+        } else if ("boolean".equals(esType)) {
+            if (val instanceof Boolean) {
+                res = val;
+            } else if (val instanceof Number) {
+                int v = ((Number) val).intValue();
+                res = v != 0;
+            } else {
+                res = Boolean.parseBoolean(val.toString());
+            }
+        } else if ("date".equals(esType)) {
+            if (val instanceof java.sql.Time) {
+                DateTime dateTime = new DateTime(((java.sql.Time) val).getTime());
+                if (dateTime.getMillisOfSecond() != 0) {
+                    res = dateTime.toString("HH:mm:ss.SSS");
+                } else {
+                    res = dateTime.toString("HH:mm:ss");
+                }
+            } else if (val instanceof java.sql.Timestamp) {
+                DateTime dateTime = new DateTime(((java.sql.Timestamp) val).getTime());
+                if (dateTime.getMillisOfSecond() != 0) {
+                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
+                } else {
+                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss+08:00");
+                }
+            } else if (val instanceof java.sql.Date || val instanceof Date) {
+                DateTime dateTime;
+                if (val instanceof java.sql.Date) {
+                    dateTime = new DateTime(((java.sql.Date) val).getTime());
+                } else {
+                    dateTime = new DateTime(((Date) val).getTime());
+                }
+                if (dateTime.getHourOfDay() == 0 && dateTime.getMinuteOfHour() == 0 && dateTime.getSecondOfMinute() == 0
+                    && dateTime.getMillisOfSecond() == 0) {
+                    res = dateTime.toString("yyyy-MM-dd");
+                } else {
+                    if (dateTime.getMillisOfSecond() != 0) {
+                        res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
+                    } else {
+                        res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss+08:00");
+                    }
+                }
+            } else if (val instanceof Long) {
+                DateTime dateTime = new DateTime(((Long) val).longValue());
+                if (dateTime.getHourOfDay() == 0 && dateTime.getMinuteOfHour() == 0 && dateTime.getSecondOfMinute() == 0
+                    && dateTime.getMillisOfSecond() == 0) {
+                    res = dateTime.toString("yyyy-MM-dd");
+                } else if (dateTime.getMillisOfSecond() != 0) {
+                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
+                } else {
+                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss+08:00");
+                }
+            } else if (val instanceof String) {
+                String v = ((String) val).trim();
+                if (v.length() > 18 && v.charAt(4) == '-' && v.charAt(7) == '-' && v.charAt(10) == ' '
+                    && v.charAt(13) == ':' && v.charAt(16) == ':') {
+                    String dt = v.substring(0, 10) + "T" + v.substring(11);
+                    DateTime dateTime = new DateTime(dt);
+                    if (dateTime.getMillisOfSecond() != 0) {
+                        res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
+                    } else {
+                        res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss+08:00");
+                    }
+                } else if (v.length() == 10 && v.charAt(4) == '-' && v.charAt(7) == '-') {
+                    DateTime dateTime = new DateTime(v);
+                    res = dateTime.toString("yyyy-MM-dd");
+                }
+            }
+        } else if ("binary".equals(esType)) {
+            if (val instanceof byte[]) {
+                Base64 base64 = new Base64();
+                res = base64.encodeAsString((byte[]) val);
+            } else if (val instanceof Blob) {
+                byte[] b = blobToBytes((Blob) val);
+                Base64 base64 = new Base64();
+                res = base64.encodeAsString(b);
+            } else if (val instanceof String) {
+                try {
+                    // 对应canal中的单字节编码
+                    byte[] b = ((String) val).getBytes("ISO-8859-1");
+                    Base64 base64 = new Base64();
+                    res = base64.encodeAsString(b);
+                } catch (UnsupportedEncodingException e) {
+                    logger.error(e.getMessage());
+                }
+            }
+        } else if ("geo_point".equals(esType)) {
+            if (!(val instanceof String)) {
+                logger.error("es type is geo_point, but source type is not String");
+                return val;
+            }
+
+            if (!((String) val).contains(",")) {
+                logger.error("es type is geo_point, source value not contains ',' spit");
+                return val;
+            }
+
+            String[] point = ((String) val).split(",");
+            Map<String, Double> location = new HashMap<>();
+            location.put("lat", Double.valueOf(point[0].trim()));
+            location.put("lon", Double.valueOf(point[1].trim()));
+            return location;
+        } else if ("object".equals(esType)) {
+            if ("".equals(val.toString().trim())) {
+                res = new HashMap<>();
+            } else {
+                res = JSON.parseObject(val.toString(), Map.class);
+            }
+        } else {
+            // 其他类全以字符串处理
+            res = val.toString();
+        }
+
+        return res;
+    }
+
+    /**
+     * Blob转byte[]
+     */
+    private static byte[] blobToBytes(Blob blob) {
+        try (InputStream is = blob.getBinaryStream()) {
+            byte[] b = new byte[(int) blob.length()];
+            is.read(b);
+            return b;
+        } catch (IOException | SQLException e) {
+            logger.error(e.getMessage());
+            return null;
+        }
+    }
+
+    /**
+     * 拼接主键条件
+     * 
+     * @param mapping
+     * @param data
+     * @return
+     */
+    public static String pkConditaionSql(ESMapping mapping, Map<String, Object> data) {
+        Set<ColumnItem> idColumns = new LinkedHashSet<>();
+        SchemaItem schemaItem = mapping.getSchemaItem();
+
+        TableItem mainTable = schemaItem.getMainTable();
+
+        for (ColumnItem idColumnItem : schemaItem.getIdFieldItem(mapping).getColumnItems()) {
+            if ((mainTable.getAlias() == null && idColumnItem.getOwner() == null)
+                || (mainTable.getAlias() != null && mainTable.getAlias().equals(idColumnItem.getOwner()))) {
+                idColumns.add(idColumnItem);
+            }
+        }
+
+        if (idColumns.isEmpty()) {
+            throw new RuntimeException("Not found primary key field in main table");
+        }
+
+        // 拼接condition
+        StringBuilder condition = new StringBuilder(" ");
+        for (ColumnItem idColumn : idColumns) {
+            Object idVal = data.get(Util.cleanColumn(idColumn.getColumnName()));
+            if (mainTable.getAlias() != null) condition.append(mainTable.getAlias()).append(".");
+            condition.append(idColumn.getColumnName()).append("=");
+            if (idVal instanceof String) {
+                condition.append("'").append(idVal).append("' AND ");
+            } else {
+                condition.append(idVal).append(" AND ");
+            }
+        }
+
+        if (condition.toString().endsWith("AND ")) {
+            int len2 = condition.length();
+            condition.delete(len2 - 4, len2);
+        }
+        return condition.toString();
+    }
+
+    public static String appendCondition(String sql, String condition) {
+        return sql + " WHERE " + condition + " ";
+    }
+
+    /**
+     * 执行查询sql
+     */
+    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) throws SQLException {
+        Connection conn = null;
+        Statement smt = null;
+        ResultSet rs = null;
+        try {
+            conn = ds.getConnection();
+            smt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            smt.setFetchSize(Integer.MIN_VALUE);
+            rs = smt.executeQuery(sql);
+
+            return fun.apply(rs);
+        } catch (SQLException e) {
+            logger.error("sqlRs has error, sql: {} ", sql);
+            throw e;
+        } finally {
+            if (rs != null) {
+                try {
+                    rs.close();
+                } catch (SQLException e) {
+                    logger.error("error to close result set");
+                }
+            }
+            if (smt != null) {
+                try {
+                    smt.close();
+                } catch (SQLException e) {
+                    logger.error("error to close statement");
+                }
+            }
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    logger.error("error to close db connection");
+                }
+            }
+        }
+    }
+}

+ 83 - 13
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -2,7 +2,10 @@ package com.alibaba.otter.canal.client.adapter.es.support;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
@@ -11,6 +14,8 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
@@ -101,29 +106,29 @@ public class ESTemplate {
     }
 
     public void append4Update(BulkRequestBuilder bulkRequestBuilder, ESSyncConfig esSyncConfig, Object pkVal,
-                                      Map<String, Object> esFieldData) {
+                              Map<String, Object> esFieldData) {
         ESMapping mapping = esSyncConfig.getEsMapping();
         if (mapping.get_id() != null) {
             bulkRequestBuilder
-                    .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                            .setDoc(esFieldData));
+                .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                    .setDoc(esFieldData));
         } else {
             SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-                    .setTypes(mapping.get_type())
-                    .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-                    .setSize(MAX_BATCH_SIZE)
-                    .get();
+                .setTypes(mapping.get_type())
+                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+                .setSize(MAX_BATCH_SIZE)
+                .get();
             for (SearchHit hit : response.getHits()) { // 理论上只有一条
                 bulkRequestBuilder
-                        .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
-                                .setDoc(esFieldData));
+                    .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
+                        .setDoc(esFieldData));
             }
         }
     }
 
     /**
      * update by query
-     * 
+     *
      * @param esSyncConfig
      * @param queryBuilder
      * @param esFieldData
@@ -215,7 +220,7 @@ public class ESTemplate {
 
     /**
      * 通过主键删除数据
-     * 
+     *
      * @param esSyncConfig
      * @param pkVal
      * @return
@@ -242,7 +247,7 @@ public class ESTemplate {
 
     /**
      * 批量提交
-     * 
+     *
      * @param bulkRequestBuilder
      * @return
      */
@@ -265,7 +270,72 @@ public class ESTemplate {
 
             return !response.hasFailures();
         }
-
         return true;
     }
+
+    public Object getDataValue(ESMapping config, Map<String, Object> data, String fieldName) {
+        String esType = getEsType(config, fieldName);
+        Object value = data.get(fieldName);
+        if (value instanceof Byte) {
+            if ("boolean".equals(esType)) {
+                value = ((Byte) value).intValue() != 0;
+            }
+        }
+        return value;
+    }
+
+    public Object convertType(ESMapping config, String fieldName, Object val) {
+        // 从mapping中获取类型来转换
+        String esType = getEsType(config, fieldName);
+        return ESSyncUtil.typeConvert(val, esType);
+    }
+
+    /**
+     * es 字段类型本地缓存
+     */
+    private static ConcurrentMap<String, Map<String, String>> esFieldTypes = new ConcurrentHashMap<>();
+
+    @SuppressWarnings("unchecked")
+    public String getEsType(ESMapping config, String fieldName) {
+        String key = config.get_index() + "-" + config.get_type();
+        Map<String, String> fieldType = esFieldTypes.get(key);
+        if (fieldType == null) {
+            ImmutableOpenMap<String, MappingMetaData> mappings;
+            try {
+                mappings = transportClient.admin()
+                    .cluster()
+                    .prepareState()
+                    .execute()
+                    .actionGet()
+                    .getState()
+                    .getMetaData()
+                    .getIndices()
+                    .get(config.get_index())
+                    .getMappings();
+            } catch (NullPointerException e) {
+                throw new IllegalArgumentException("Not found the mapping info of index: " + config.get_index());
+            }
+            MappingMetaData mappingMetaData = mappings.get(config.get_type());
+            if (mappingMetaData == null) {
+                throw new IllegalArgumentException("Not found the mapping info of index: " + config.get_index());
+            }
+
+            Map<String, String> fieldTypeTmp = new LinkedHashMap<>();
+
+            Map<String, Object> sourceMap = mappingMetaData.getSourceAsMap();
+            Map<String, Object> mapping = (Map<String, Object>) sourceMap.get("properties");
+            mapping.forEach((k, v) -> {
+                Map<String, Object> value = (Map<String, Object>) v;
+                if (value.containsKey("properties")) {
+                    fieldTypeTmp.put(k, "object");
+                } else {
+                    fieldTypeTmp.put(k, (String) value.get("type"));
+                }
+            });
+            fieldType = fieldTypeTmp;
+            esFieldTypes.put(key, fieldType);
+        }
+
+        return fieldType.get(fieldName);
+    }
 }

+ 18 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/Util.java

@@ -0,0 +1,18 @@
+package com.alibaba.otter.canal.client.adapter.es.support;
+
+public class Util {
+    public static String cleanColumn(String column) {
+        if (column == null) {
+            return null;
+        }
+        if (column.contains("`")) {
+            column = column.replaceAll("`", "");
+        }
+
+        if (column.contains("'")) {
+            column = column.replaceAll("'", "");
+        }
+
+        return column;
+    }
+}

+ 0 - 12
client-adapter/elasticsearch/src/main/resources/es/myetst_user.yml

@@ -1,12 +0,0 @@
-dataSourceKey: defaultDS
-esMapping:
-  _index: mytest_user
-  _type: _doc
-  _id: id
-#  pk: id
-  sql: "select a.id, concat(a.name,'_test') as name, a.role_id, b.name as role_name, c.labels from user a
-        left join role b on a.role_id=b.id
-        left join (select user_id, group_concat(label,',') as labels from user_label
-        group by user_id) c on c.user_id=a.id"
-  commitBatch: 3000
-  etlCondition: "where a.c_time>='{0}' or b.c_time>='{0}' or c.c_time>='{0}'"

+ 14 - 0
client-adapter/elasticsearch/src/main/resources/es/mytest_user.yml

@@ -0,0 +1,14 @@
+dataSourceKey: defaultDS
+destination: example
+esMapping:
+  _index: mytest_user
+  _type: _doc
+  _id: id
+#  pk: id
+#  sql: "select a.id, concat(a.name,'_test') as name, a.role_id, b.name as role_name, c.labels from user a
+#        left join role b on a.role_id=b.id
+#        left join (select user_id, group_concat(label,',') as labels from user_label
+#        group by user_id) c on c.user_id=a.id"
+  sql: "select id, name, c_time from user "
+  commitBatch: 3000
+  etlCondition: "where a.c_time>='{0}' or b.c_time>='{0}' or c.c_time>='{0}'"

+ 3 - 4
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ConfigLoadTest.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.client.adapter.es.test;
 
-import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 
@@ -16,8 +15,8 @@ import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 public class ConfigLoadTest {
 
     @Before
-    public void before() throws SQLException {
-        AdapterConfigs.put("es", "myetst_user.yml");
+    public void before() {
+        AdapterConfigs.put("es", "mytest_user.yml");
         // 加载数据源连接池
         DatasourceConfig.DATA_SOURCES.put("defaultDS", DataSourceConstant.dataSource);
     }
@@ -26,7 +25,7 @@ public class ConfigLoadTest {
     public void testLoad() {
         ESSyncConfigLoader.load();
         Map<String, ESSyncConfig> configMap = ESSyncConfigLoader.getEsSyncConfig();
-        ESSyncConfig config = configMap.get("myetst_user.yml");
+        ESSyncConfig config = configMap.get("mytest_user.yml");
         Assert.assertNotNull(config);
         Assert.assertEquals("defaultDS", config.getDataSourceKey());
         ESSyncConfig.ESMapping esMapping = config.getEsMapping();

+ 54 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/SyncTest.java

@@ -0,0 +1,54 @@
+package com.alibaba.otter.canal.client.adapter.es.test;
+
+import java.util.*;
+
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+
+public class SyncTest {
+
+    private ESAdapter esAdapter;
+
+    @Before
+    public void init() {
+        AdapterConfigs.put("es", "mytest_user.yml");
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", DataSourceConstant.dataSource);
+
+        OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
+        outerAdapterConfig.setName("es");
+        outerAdapterConfig.setHosts("127.0.0.1:9300");
+        Map<String, String> properties = new HashMap<>();
+        properties.put("cluster.name", "elasticsearch");
+        outerAdapterConfig.setProperties(properties);
+
+        esAdapter = new ESAdapter();
+        esAdapter.init(outerAdapterConfig);
+
+    }
+
+    @Test
+    public void insertTest01() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric");
+        data.put("c_time", new Date());
+
+        dml.setData(dataList);
+
+        esAdapter.sync(dml);
+    }
+}

+ 2 - 2
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -53,7 +53,7 @@ public class HbaseAdapter implements OuterAdapter {
                         hbaseMapping = MappingConfigLoader.load();
                         mappingConfigCache = new HashMap<>();
                         for (MappingConfig mappingConfig : hbaseMapping.values()) {
-                            mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getHbaseMapping().getDestination())
+                            mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination())
                                                    + "." + mappingConfig.getHbaseMapping().getDatabase() + "."
                                                    + mappingConfig.getHbaseMapping().getTable(),
                                 mappingConfig);
@@ -170,7 +170,7 @@ public class HbaseAdapter implements OuterAdapter {
     public String getDestination(String task) {
         MappingConfig config = hbaseMapping.get(task);
         if (config != null && config.getHbaseMapping() != null) {
-            return config.getHbaseMapping().getDestination();
+            return config.getDestination();
         }
         return null;
     }

+ 10 - 9
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java

@@ -12,6 +12,8 @@ public class MappingConfig {
 
     private String       dataSourceKey; // 数据源key
 
+    private String       destination;   // canal实例或MQ的topic
+
     private HbaseMapping hbaseMapping;  // hbase映射配置
 
     public String getDataSourceKey() {
@@ -22,6 +24,14 @@ public class MappingConfig {
         this.dataSourceKey = dataSourceKey;
     }
 
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
     public HbaseMapping getHbaseMapping() {
         return hbaseMapping;
     }
@@ -143,7 +153,6 @@ public class MappingConfig {
     public static class HbaseMapping {
 
         private Mode                    mode               = Mode.STRING;           // hbase默认转换格式
-        private String                  destination;                                // canal实例或MQ的topic
         private String                  database;                                   // 数据库名或schema名
         private String                  table;                                      // 表面名
         private String                  hbaseTable;                                 // hbase表名
@@ -169,14 +178,6 @@ public class MappingConfig {
             this.mode = mode;
         }
 
-        public String getDestination() {
-            return destination;
-        }
-
-        public void setDestination(String destination) {
-            this.destination = destination;
-        }
-
         public String getDatabase() {
             return database;
         }

+ 1 - 1
client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml

@@ -1,7 +1,7 @@
 dataSourceKey: defaultDS
+destination: example
 hbaseMapping:
   mode: PHOENIX  #NATIVE   #STRING
-  destination: example
   database: mytest  # 数据库名
   table: person2     # 数据库表名
   hbaseTable: MYTEST.PERSON2   # HBase表名

+ 26 - 20
client-adapter/launcher/src/main/resources/application.yml

@@ -3,6 +3,7 @@ server:
 logging:
   level:
     com.alibaba.otter.canal.client.adapter.hbase: DEBUG
+    com.alibaba.otter.canal.client.adapter.es: DEBUG
 spring:
   jackson:
     date-format: yyyy-MM-dd HH:mm:ss
@@ -15,30 +16,34 @@ hbase.zookeeper.property.clientPort: 2181
 hbase.zookeeper.znode.parent: /hbase
 
 canal.conf:
-#  canalServerHost: 127.0.0.1:11111
+  canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
-  bootstrapServers: slave1:6667 #or rocketmq nameservers:host1:9876;host2:9876
+#  bootstrapServers: slave1:6667 #or rocketmq nameservers:host1:9876;host2:9876
   flatMessage: true
   retry: 3
   timeout: 20000
   batchSize: 50
-#  canalInstances:
-#  - instance: example
-#    groups:
-#    - outAdapters:
+  canalInstances:
+  - instance: example
+    groups:
+    - outAdapters:
+      - name: es
+        hosts: 127.0.0.1:9300
+        properties:
+          cluster.name: elasticsearch
 #      - name: logger
 #      - name: hbase
 #        properties:
 #          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
 #          hbase.zookeeper.property.clientPort: ${hbase.zookeeper.property.clientPort}
 #          zookeeper.znode.parent: ${hbase.zookeeper.znode.parent}
-  mqTopics:
-  - mqMode: kafka
-    topic: example
-    groups:
-    - groupId: g2
-      outAdapters:
-      - name: logger
+#  mqTopics:
+#  - mqMode: kafka
+#    topic: example
+#    groups:
+#    - groupId: g2
+#      outAdapters:
+#      - name: logger
 #  mqTopics:
 #  - mqMode: rocketmq
 #    topic: example
@@ -47,11 +52,12 @@ canal.conf:
 #      outAdapters:
 #      - name: logger
 
-#adapter.conf:
-#  datasourceConfigs:
-#    defaultDS:
-#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
-#      username: root
-#      password: 121212
-#  adapterConfigs:
+adapter.conf:
+  datasourceConfigs:
+    defaultDS:
+      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
+      username: root
+      password: 121212
+  adapterConfigs:
 #  - hbase/mytest_person2.yml
+  - es/mytest_user.yml