Browse Source

elasticsearch适配器

mcy 6 years ago
parent
commit
1d2e85672c

+ 77 - 0
client-adapter/elasticsearch/pom.xml

@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>canal.client-adapter</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.1.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>client-adapter.elasticsearch</artifactId>
+    <packaging>jar</packaging>
+    <name>canal client adapter ElasticSearch module for otter ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>1.19</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.fastsql</groupId>
+            <artifactId>fastsql</artifactId>
+            <version>2.0.0_preview_644</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>6.2.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>transport</artifactId>
+            <version>6.2.3</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 82 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -0,0 +1,82 @@
+package com.alibaba.otter.canal.client.adapter.es;
+
+import java.util.List;
+import java.util.Map;
+
+import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+
+/**
+ * ES外部适配器
+ *
+ * @author rewerma 2018-10-20
+ * @version 1.0.0
+ */
+@SPI("es")
+public class ESAdapter implements OuterAdapter {
+
+    private static Logger                             logger       = LoggerFactory.getLogger(ESAdapter.class);
+
+    private static volatile Map<String, ESSyncConfig> esSyncConfig = null;                                    // 文件名对应配置
+
+    private TransportClient                           transportClient;
+
+    private ESSyncService                             esSyncService;
+
+    @Override
+    public void init(OuterAdapterConfig configuration) {
+        try {
+            if (esSyncConfig == null) {
+                synchronized (ESSyncConfig.class) {
+                    if (esSyncConfig == null) {
+                        esSyncConfig = ESSyncConfigLoader.load();
+                    }
+                }
+            }
+            Map<String, String> properties = configuration.getProperties();
+            Settings.Builder settingBuilder = Settings.builder();
+            properties.forEach(settingBuilder::put);
+            Settings settings = settingBuilder.build();
+            transportClient = new PreBuiltTransportClient(settings);
+            esSyncService = new ESSyncService(transportClient);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void sync(Dml dml) {
+    }
+
+    @Override
+    public EtlResult etl(String task, List<String> params) {
+        return null;
+    }
+
+    @Override
+    public Map<String, Object> count(String task) {
+        return null;
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+
+    @Override
+    public String getDestination(String task) {
+        return null;
+    }
+}

+ 137 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -0,0 +1,137 @@
+package com.alibaba.otter.canal.client.adapter.es.config;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ESSyncConfig {
+
+    private String    dataSourceKey; // 数据源key
+
+    private ESMapping esMapping;
+
+    public void validate() {
+        if (esMapping._index == null) {
+            throw new NullPointerException("esMapping._index");
+        }
+        if (esMapping._type == null) {
+            throw new NullPointerException("esMapping._type");
+        }
+        if (esMapping._id == null && esMapping.pk == null) {
+            throw new NullPointerException("esMapping._id and esMapping.pk");
+        }
+        if (esMapping.sql == null) {
+            throw new NullPointerException("esMapping.sql");
+        }
+    }
+
+    public String getDataSourceKey() {
+        return dataSourceKey;
+    }
+
+    public void setDataSourceKey(String dataSourceKey) {
+        this.dataSourceKey = dataSourceKey;
+    }
+
+    public ESMapping getEsMapping() {
+        return esMapping;
+    }
+
+    public void setEsMapping(ESMapping esMapping) {
+        this.esMapping = esMapping;
+    }
+
+    public static class ESMapping {
+
+        private String       _index;
+        private String       _type;
+        private String       _id;
+        private String       pk;
+        private String       parent;
+        private String       sql;
+        private List<String> skips       = new ArrayList<>();
+        private boolean      alwaysSql   = false;
+        private int          commitBatch = 1000;
+        private String       etlCondition;
+
+        public String get_index() {
+            return _index;
+        }
+
+        public void set_index(String _index) {
+            this._index = _index;
+        }
+
+        public String get_type() {
+            return _type;
+        }
+
+        public void set_type(String _type) {
+            this._type = _type;
+        }
+
+        public String get_id() {
+            return _id;
+        }
+
+        public void set_id(String _id) {
+            this._id = _id;
+        }
+
+        public String getPk() {
+            return pk;
+        }
+
+        public void setPk(String pk) {
+            this.pk = pk;
+        }
+
+        public String getParent() {
+            return parent;
+        }
+
+        public void setParent(String parent) {
+            this.parent = parent;
+        }
+
+        public List<String> getSkips() {
+            return skips;
+        }
+
+        public void setSkips(List<String> skips) {
+            this.skips = skips;
+        }
+
+        public String getSql() {
+            return sql;
+        }
+
+        public void setSql(String sql) {
+            this.sql = sql;
+        }
+
+        public boolean isAlwaysSql() {
+            return alwaysSql;
+        }
+
+        public void setAlwaysSql(boolean alwaysSql) {
+            this.alwaysSql = alwaysSql;
+        }
+
+        public int getCommitBatch() {
+            return commitBatch;
+        }
+
+        public void setCommitBatch(int commitBatch) {
+            this.commitBatch = commitBatch;
+        }
+
+        public String getEtlCondition() {
+            return etlCondition;
+        }
+
+        public void setEtlCondition(String etlCondition) {
+            this.etlCondition = etlCondition;
+        }
+
+    }
+}

+ 89 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -0,0 +1,89 @@
+package com.alibaba.otter.canal.client.adapter.es.config;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+
+public class ESSyncConfigLoader {
+
+    private static Logger       logger    = LoggerFactory.getLogger(ESSyncConfigLoader.class);
+
+    private static final String BASE_PATH = "es";
+
+    public static Map<String, ESSyncConfig> load() {
+        logger.info("## Start loading mapping config ... ");
+
+        Map<String, ESSyncConfig> result = new LinkedHashMap<>();
+
+        Collection<String> configs = AdapterConfigs.get("es");
+        for (String c : configs) {
+            if (c == null) {
+                continue;
+            }
+            c = c.trim();
+            if (c.equals("") || c.startsWith("#")) {
+                continue;
+            }
+
+            ESSyncConfig config;
+            String configContent = null;
+
+            if (c.endsWith(".yml")) {
+                configContent = readConfigContent(BASE_PATH + "/" + c);
+            }
+
+            config = new Yaml().loadAs(configContent, ESSyncConfig.class);
+
+            try {
+                config.validate();
+            } catch (Exception e) {
+                throw new RuntimeException("ERROR Config: " + c + " " + e.getMessage(), e);
+            }
+            result.put(c, config);
+        }
+
+        logger.info("## Mapping config loaded");
+        return result;
+    }
+
+    public static String readConfigContent(String config) {
+        InputStream in = null;
+        try {
+            // 先取本地文件,再取类路径
+            File configFile = new File("config/" + config);
+            if (configFile.exists()) {
+                in = new FileInputStream(configFile);
+            } else {
+                in = ESSyncConfigLoader.class.getClassLoader().getResourceAsStream(config);
+            }
+            if (in == null) {
+                throw new RuntimeException("Config file not found.");
+            }
+
+            byte[] bytes = new byte[in.available()];
+            in.read(bytes);
+            return new String(bytes, StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            throw new RuntimeException("Read yml config error ", e);
+        } finally {
+            try {
+                if (in != null) {
+                    in.close();
+                }
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+    }
+}

+ 364 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java

@@ -0,0 +1,364 @@
+package com.alibaba.otter.canal.client.adapter.es.config;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SchemaItem {
+
+    private Map<String, TableItem>                aliasTableItems = new LinkedHashMap<>();
+    private Map<String, FieldItem>                selectFields    = new LinkedHashMap<>();
+    private String                                sql;
+
+    private volatile Map<String, List<TableItem>> tableItemAliases;
+    private volatile Map<String, List<FieldItem>> columnFields;
+
+    public void init() {
+        this.getTableItemAliases();
+        this.getColumnFields();
+        aliasTableItems.values().forEach(tableItem -> {
+            tableItem.getRelationTableFields();
+            tableItem.getRelationSelectFields();
+        });
+    }
+
+    public Map<String, TableItem> getAliasTableItems() {
+        return aliasTableItems;
+    }
+
+    public void setAliasTableItems(Map<String, TableItem> aliasTableItems) {
+        this.aliasTableItems = aliasTableItems;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
+    public Map<String, FieldItem> getSelectFields() {
+        return selectFields;
+    }
+
+    public void setSelectFields(Map<String, FieldItem> selectFields) {
+        this.selectFields = selectFields;
+    }
+
+    public Map<String, List<TableItem>> getTableItemAliases() {
+        if (tableItemAliases == null) {
+            synchronized (SchemaItem.class) {
+                if (tableItemAliases == null) {
+                    tableItemAliases = new LinkedHashMap<>();
+                    aliasTableItems.forEach((alias, tableItem) -> {
+                        List<TableItem> aliases = tableItemAliases
+                            .computeIfAbsent(tableItem.getTableName().toLowerCase(), k -> new ArrayList<>());
+                        aliases.add(tableItem);
+                    });
+                }
+            }
+        }
+        return tableItemAliases;
+    }
+
+    public Map<String, List<FieldItem>> getColumnFields() {
+        if (columnFields == null) {
+            synchronized (SchemaItem.class) {
+                if (columnFields == null) {
+                    columnFields = new LinkedHashMap<>();
+                    getSelectFields()
+                        .forEach((fieldName, fieldItem) -> fieldItem.getColumnItems().forEach(columnItem -> {
+                            TableItem tableItem = getAliasTableItems().get(columnItem.getOwner());
+                            if (!tableItem.isSubQuery()) {
+                                List<FieldItem> fieldItems = columnFields.computeIfAbsent(
+                                    columnItem.getOwner() + "." + columnItem.getColumnName(),
+                                    k -> new ArrayList<>());
+                                fieldItems.add(fieldItem);
+                            } else {
+                                tableItem.getSubQueryFields().forEach(subQueryField -> {
+                                    List<FieldItem> fieldItems = columnFields.computeIfAbsent(
+                                        columnItem.getOwner() + "." + subQueryField.getColumn().getColumnName(),
+                                        k -> new ArrayList<>());
+                                    fieldItems.add(fieldItem);
+                                });
+                            }
+                        }));
+                }
+            }
+        }
+        return columnFields;
+
+    }
+
+    public TableItem getMainTable() {
+        if (!aliasTableItems.isEmpty()) {
+            return aliasTableItems.values().iterator().next();
+        } else {
+            return null;
+        }
+    }
+
+    public static class TableItem {
+
+        private SchemaItem               schemaItem;
+
+        private String                   schema;
+        private String                   tableName;
+        private String                   alias;
+        private String                   subQuerySql;
+        private List<FieldItem>          subQueryFields = new ArrayList<>();
+        private List<RelationFieldsPair> relationFields = new ArrayList<>();
+
+        private boolean                  main;
+        private boolean                  subQuery;
+
+        private volatile List<FieldItem> relationTableFields;               // 当前表关联条件字段
+        private volatile List<FieldItem> relationSelectFieldItem;           // 关联条件字段在select中的对应字段
+
+        public TableItem(SchemaItem schemaItem){
+            this.schemaItem = schemaItem;
+        }
+
+        public SchemaItem getSchemaItem() {
+            return schemaItem;
+        }
+
+        public void setSchemaItem(SchemaItem schemaItem) {
+            this.schemaItem = schemaItem;
+        }
+
+        public String getSchema() {
+            return schema;
+        }
+
+        public void setSchema(String schema) {
+            this.schema = schema;
+        }
+
+        public String getTableName() {
+            return tableName;
+        }
+
+        public void setTableName(String tableName) {
+            this.tableName = tableName;
+        }
+
+        public String getAlias() {
+            return alias;
+        }
+
+        public void setAlias(String alias) {
+            this.alias = alias;
+        }
+
+        public String getSubQuerySql() {
+            return subQuerySql;
+        }
+
+        public void setSubQuerySql(String subQuerySql) {
+            this.subQuerySql = subQuerySql;
+        }
+
+        public boolean isMain() {
+            return main;
+        }
+
+        public void setMain(boolean main) {
+            this.main = main;
+        }
+
+        public boolean isSubQuery() {
+            return subQuery;
+        }
+
+        public void setSubQuery(boolean subQuery) {
+            this.subQuery = subQuery;
+        }
+
+        public List<FieldItem> getSubQueryFields() {
+            return subQueryFields;
+        }
+
+        public void setSubQueryFields(List<FieldItem> subQueryFields) {
+            this.subQueryFields = subQueryFields;
+        }
+
+        public List<RelationFieldsPair> getRelationFields() {
+            return relationFields;
+        }
+
+        public void setRelationFields(List<RelationFieldsPair> relationFields) {
+            this.relationFields = relationFields;
+        }
+
+        public List<FieldItem> getRelationTableFields() {
+            if (relationTableFields == null) {
+                synchronized (SchemaItem.class) {
+                    if (relationTableFields == null) {
+                        relationTableFields = new ArrayList<>();
+                        getRelationFields().forEach(relationFieldsPair -> {
+                            FieldItem leftFieldItem = relationFieldsPair.getLeftFieldItem();
+                            FieldItem rightFieldItem = relationFieldsPair.getRightFieldItem();
+                            if (getAlias().equals(leftFieldItem.getOwner())) {
+                                relationTableFields.add(leftFieldItem);
+                            } else if (getAlias().equals(rightFieldItem.getOwner())) {
+                                relationTableFields.add(rightFieldItem);
+                            }
+                        });
+                    }
+                }
+            }
+            return relationTableFields;
+        }
+
+        public List<FieldItem> getRelationSelectFields() {
+            if (relationSelectFieldItem == null) {
+                synchronized (SchemaItem.class) {
+                    if (relationSelectFieldItem == null) {
+                        relationSelectFieldItem = new ArrayList<>();
+                        getRelationFields().forEach(relationFieldsPair -> {
+                            FieldItem leftFieldItem = relationFieldsPair.getLeftFieldItem();
+                            List<FieldItem> selectFieldItem = getSchemaItem().getColumnFields()
+                                .get(leftFieldItem.getOwner() + "." + leftFieldItem.getColumn().getColumnName());
+                            if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
+                                relationSelectFieldItem.addAll(selectFieldItem);
+                            } else {
+                                FieldItem rightFieldItem = relationFieldsPair.getRightFieldItem();
+                                selectFieldItem = getSchemaItem().getColumnFields()
+                                    .get(rightFieldItem.getOwner() + "." + rightFieldItem.getColumn().getColumnName());
+                                if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
+                                    relationSelectFieldItem.addAll(selectFieldItem);
+                                } else {
+                                    throw new UnsupportedOperationException(
+                                        "Relation condition column must in select columns.");
+                                }
+                            }
+                        });
+                    }
+                }
+            }
+            return relationSelectFieldItem;
+        }
+    }
+
+    public static class RelationFieldsPair {
+
+        private FieldItem leftFieldItem;
+        private FieldItem rightFieldItem;
+
+        public RelationFieldsPair(FieldItem leftFieldItem, FieldItem rightFieldItem){
+            this.leftFieldItem = leftFieldItem;
+            this.rightFieldItem = rightFieldItem;
+        }
+
+        public FieldItem getLeftFieldItem() {
+            return leftFieldItem;
+        }
+
+        public void setLeftFieldItem(FieldItem leftFieldItem) {
+            this.leftFieldItem = leftFieldItem;
+        }
+
+        public FieldItem getRightFieldItem() {
+            return rightFieldItem;
+        }
+
+        public void setRightFieldItem(FieldItem rightFieldItem) {
+            this.rightFieldItem = rightFieldItem;
+        }
+    }
+
+    public static class FieldItem {
+
+        private String           fieldName;
+        private List<ColumnItem> columnItems = new ArrayList<>();
+        private List<String>     owners      = new ArrayList<>();
+
+        private boolean          method;
+        private boolean          binaryOp;
+
+        public String getFieldName() {
+            return fieldName;
+        }
+
+        public void setFieldName(String fieldName) {
+            this.fieldName = fieldName;
+        }
+
+        public List<ColumnItem> getColumnItems() {
+            return columnItems;
+        }
+
+        public void setColumnItems(List<ColumnItem> columnItems) {
+            this.columnItems = columnItems;
+        }
+
+        public boolean isMethod() {
+            return method;
+        }
+
+        public void setMethod(boolean method) {
+            this.method = method;
+        }
+
+        public boolean isBinaryOp() {
+            return binaryOp;
+        }
+
+        public void setBinaryOp(boolean binaryOp) {
+            this.binaryOp = binaryOp;
+        }
+
+        public List<String> getOwners() {
+            return owners;
+        }
+
+        public void setOwners(List<String> owners) {
+            this.owners = owners;
+        }
+
+        public void addColumn(ColumnItem columnItem) {
+            columnItems.add(columnItem);
+        }
+
+        public ColumnItem getColumn() {
+            if (!columnItems.isEmpty()) {
+                return columnItems.get(0);
+            } else {
+                return null;
+            }
+        }
+
+        public String getOwner() {
+            if (!owners.isEmpty()) {
+                return owners.get(0);
+            } else {
+                return null;
+            }
+        }
+    }
+
+    public static class ColumnItem {
+
+        private String owner;
+        private String columnName;
+
+        public String getOwner() {
+            return owner;
+        }
+
+        public void setOwner(String owner) {
+            this.owner = owner;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public void setColumnName(String columnName) {
+            this.columnName = columnName;
+        }
+    }
+}

+ 205 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SqlParser.java

@@ -0,0 +1,205 @@
+package com.alibaba.otter.canal.client.adapter.es.config;
+
+import static com.alibaba.fastsql.sql.ast.expr.SQLBinaryOperator.BooleanAnd;
+import static com.alibaba.fastsql.sql.ast.expr.SQLBinaryOperator.Equality;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.alibaba.fastsql.sql.SQLUtils;
+import com.alibaba.fastsql.sql.ast.SQLExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLBinaryOpExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLIdentifierExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLMethodInvokeExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLPropertyExpr;
+import com.alibaba.fastsql.sql.ast.statement.*;
+import com.alibaba.fastsql.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
+import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlStatementParser;
+import com.alibaba.fastsql.sql.parser.ParserException;
+import com.alibaba.fastsql.sql.parser.SQLStatementParser;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.RelationFieldsPair;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
+
+/**
+ * ES同步指定sql格式解析
+ * 
+ * @author rewerma 2018-10-26 下午03:45:49
+ * @version 1.0.0
+ */
+public class SqlParser {
+
+    /**
+     * 解析sql
+     * 
+     * @param sql
+     * @return
+     */
+    public static SchemaItem parse(String sql) {
+        try {
+            SQLStatementParser parser = new MySqlStatementParser(sql);
+            SQLSelectStatement statement = (SQLSelectStatement) parser.parseStatement();
+            MySqlSelectQueryBlock sqlSelectQueryBlock = (MySqlSelectQueryBlock) statement.getSelect().getQuery();
+
+            SchemaItem schemaItem = new SchemaItem();
+            schemaItem.setSql(SQLUtils.toMySqlString(sqlSelectQueryBlock));
+            SQLTableSource sqlTableSource = sqlSelectQueryBlock.getFrom();
+            List<TableItem> tableItems = new ArrayList<>();
+            SqlParser.visitSelectTable(schemaItem, sqlTableSource, tableItems, null);
+            tableItems.forEach(tableItem -> schemaItem.getAliasTableItems().put(tableItem.getAlias(), tableItem));
+
+            List<FieldItem> fieldItems = collectSelectQueryFields(sqlSelectQueryBlock);
+            fieldItems.forEach(fieldItem -> schemaItem.getSelectFields().put(fieldItem.getFieldName(), fieldItem));
+
+            schemaItem.init();
+
+            if (schemaItem.getAliasTableItems().isEmpty() || schemaItem.getSelectFields().isEmpty()) {
+                throw new ParserException("Parse sql error");
+            }
+            return schemaItem;
+        } catch (Exception e) {
+            throw new ParserException();
+        }
+    }
+
+    /**
+     * 归集字段
+     * 
+     * @param sqlSelectQueryBlock
+     * @return
+     */
+    private static List<FieldItem> collectSelectQueryFields(MySqlSelectQueryBlock sqlSelectQueryBlock) {
+        return sqlSelectQueryBlock.getSelectList().stream().map(selectItem -> {
+            FieldItem fieldItem = new FieldItem();
+            fieldItem.setFieldName(selectItem.getAlias());
+            visitColumn(selectItem.getExpr(), fieldItem);
+            return fieldItem;
+        }).collect(Collectors.toList());
+    }
+
+    /**
+     * 解析字段
+     * 
+     * @param expr
+     * @param fieldItem
+     */
+    private static void visitColumn(SQLExpr expr, FieldItem fieldItem) {
+        if (expr instanceof SQLIdentifierExpr) {
+            // 无owner
+            SQLIdentifierExpr identifierExpr = (SQLIdentifierExpr) expr;
+            if (fieldItem.getFieldName() == null) {
+                fieldItem.setFieldName(identifierExpr.getName());
+            }
+            ColumnItem columnItem = new ColumnItem();
+            columnItem.setColumnName(identifierExpr.getName());
+            fieldItem.getOwners().add(null);
+            fieldItem.addColumn(columnItem);
+        } else if (expr instanceof SQLPropertyExpr) {
+            // 有owner
+            SQLPropertyExpr sqlPropertyExpr = (SQLPropertyExpr) expr;
+            if (fieldItem.getFieldName() == null) {
+                fieldItem.setFieldName(sqlPropertyExpr.getName());
+            }
+            fieldItem.getOwners().add(sqlPropertyExpr.getOwnernName());
+            ColumnItem columnItem = new ColumnItem();
+            columnItem.setColumnName(sqlPropertyExpr.getName());
+            columnItem.setOwner(sqlPropertyExpr.getOwnernName());
+            fieldItem.addColumn(columnItem);
+        } else if (expr instanceof SQLMethodInvokeExpr) {
+            SQLMethodInvokeExpr methodInvokeExpr = (SQLMethodInvokeExpr) expr;
+            fieldItem.setMethod(true);
+            for (SQLExpr sqlExpr : methodInvokeExpr.getArguments()) {
+                visitColumn(sqlExpr, fieldItem);
+            }
+        } else if (expr instanceof SQLBinaryOpExpr) {
+            SQLBinaryOpExpr sqlBinaryOpExpr = (SQLBinaryOpExpr) expr;
+            fieldItem.setBinaryOp(true);
+            visitColumn(sqlBinaryOpExpr.getLeft(), fieldItem);
+            visitColumn(sqlBinaryOpExpr.getRight(), fieldItem);
+        }
+    }
+
+    /**
+     * 解析表
+     */
+    private static void visitSelectTable(SchemaItem schemaItem, SQLTableSource sqlTableSource,
+                                         List<TableItem> tableItems, TableItem tableItemTmp) {
+        if (sqlTableSource instanceof SQLExprTableSource) {
+            SQLExprTableSource sqlExprTableSource = (SQLExprTableSource) sqlTableSource;
+            TableItem tableItem;
+            if (tableItemTmp != null) {
+                tableItem = tableItemTmp;
+            } else {
+                tableItem = new TableItem(schemaItem);
+            }
+            tableItem.setSchema(sqlExprTableSource.getSchema());
+            tableItem.setTableName(sqlExprTableSource.getTableName());
+            if (tableItem.getAlias() == null) {
+                tableItem.setAlias(sqlExprTableSource.getAlias());
+            }
+            if (tableItems.isEmpty()) {
+                // 第一张表为主表
+                tableItem.setMain(true);
+            }
+            tableItems.add(tableItem);
+        } else if (sqlTableSource instanceof SQLJoinTableSource) {
+            SQLJoinTableSource sqlJoinTableSource = (SQLJoinTableSource) sqlTableSource;
+            SQLTableSource leftTableSource = sqlJoinTableSource.getLeft();
+            visitSelectTable(schemaItem, leftTableSource, tableItems, null);
+            SQLTableSource rightTableSource = sqlJoinTableSource.getRight();
+            TableItem rightTableItem = new TableItem(schemaItem);
+            // 解析on条件字段
+            visitOnCondition(sqlJoinTableSource.getCondition(), rightTableItem);
+            visitSelectTable(schemaItem, rightTableSource, tableItems, rightTableItem);
+
+        } else if (sqlTableSource instanceof SQLSubqueryTableSource) {
+            SQLSubqueryTableSource subQueryTableSource = (SQLSubqueryTableSource) sqlTableSource;
+            MySqlSelectQueryBlock sqlSelectQuery = (MySqlSelectQueryBlock) subQueryTableSource.getSelect().getQuery();
+            TableItem tableItem;
+            if (tableItemTmp != null) {
+                tableItem = tableItemTmp;
+            } else {
+                tableItem = new TableItem(schemaItem);
+            }
+            tableItem.setAlias(subQueryTableSource.getAlias());
+            tableItem.setSubQuerySql(SQLUtils.toMySqlString(sqlSelectQuery));
+            tableItem.setSubQuery(true);
+            tableItem.setSubQueryFields(collectSelectQueryFields(sqlSelectQuery));
+            visitSelectTable(schemaItem, sqlSelectQuery.getFrom(), tableItems, tableItem);
+        }
+    }
+
+    /**
+     * 解析on条件
+     * 
+     * @param expr
+     * @param tableItem
+     */
+    private static void visitOnCondition(SQLExpr expr, TableItem tableItem) {
+        if (!(expr instanceof SQLBinaryOpExpr)) {
+            throw new UnsupportedOperationException();
+        }
+        SQLBinaryOpExpr sqlBinaryOpExpr = (SQLBinaryOpExpr) expr;
+        if (sqlBinaryOpExpr.getOperator() == BooleanAnd) {
+            visitOnCondition(sqlBinaryOpExpr.getLeft(), tableItem);
+            visitOnCondition(sqlBinaryOpExpr.getRight(), tableItem);
+        } else if (sqlBinaryOpExpr.getOperator() == Equality) {
+            FieldItem leftFieldItem = new FieldItem();
+            visitColumn(sqlBinaryOpExpr.getLeft(), leftFieldItem);
+            if (leftFieldItem.getColumnItems().size() != 1 || leftFieldItem.isMethod() || leftFieldItem.isBinaryOp()) {
+                throw new UnsupportedOperationException("Unsupported for complex of on-condition");
+            }
+            FieldItem rightFieldItem = new FieldItem();
+            visitColumn(sqlBinaryOpExpr.getRight(), rightFieldItem);
+            if (rightFieldItem.getColumnItems().size() != 1 || rightFieldItem.isMethod()
+                || rightFieldItem.isBinaryOp()) {
+                throw new UnsupportedOperationException("Unsupported for complex of on-condition");
+            }
+            tableItem.getRelationFields().add(new RelationFieldsPair(leftFieldItem, rightFieldItem));
+        } else {
+            throw new UnsupportedOperationException("Unsupported for complex of on-condition");
+        }
+    }
+}

+ 15 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -0,0 +1,15 @@
+package com.alibaba.otter.canal.client.adapter.es.service;
+
+import org.elasticsearch.client.transport.TransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ESSyncService {
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private TransportClient transportClient;
+
+    public ESSyncService(TransportClient transportClient) {
+        this.transportClient = transportClient;
+    }
+}

+ 1 - 0
client-adapter/elasticsearch/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

@@ -0,0 +1 @@
+es=com.alibaba.otter.canal.client.adapter.es.ESAdapter

+ 12 - 0
client-adapter/elasticsearch/src/main/resources/es/myetst_user.yml

@@ -0,0 +1,12 @@
+dataSourceKey: defaultDS
+esMapping:
+  _index: mytest_user
+  _type: _doc
+  _id: id
+#  pk: id
+  sql: "select a.id, concat(a.name,'_test') as name, a.role_id, b.name as role_name, c.labels from user.a
+        left join role b on a.role_id=b.id
+        left join (select user_id, group_concat(label,',') as labels from user_label
+        group by user_id) c on c.user_id=a.id"
+  commitBatch: 3000
+  etlCondition: "where a.c_time>='{0}' or b.c_time>='{0}' or c.c_time>='{0}'"

+ 31 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ConfigLoadTest.java

@@ -0,0 +1,31 @@
+package com.alibaba.otter.canal.client.adapter.es.test;
+
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class ConfigLoadTest {
+
+    @Before
+    public void before() {
+        AdapterConfigs.put("es", "myetst_user.yml");
+    }
+
+    @Test
+    public void testLoad() {
+        Map<String, ESSyncConfig> configMap = ESSyncConfigLoader.load();
+        ESSyncConfig config = configMap.get("myetst_user.yml");
+        Assert.assertNotNull(config);
+        Assert.assertEquals("defaultDS", config.getDataSourceKey());
+        ESSyncConfig.ESMapping esMapping = config.getEsMapping();
+        Assert.assertEquals("mytest_user", esMapping.get_index());
+        Assert.assertEquals("_doc", esMapping.get_type());
+        Assert.assertEquals("id", esMapping.get_id());
+        Assert.assertNotNull(esMapping.getSql());
+    }
+}

+ 51 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/SqlParseTest.java

@@ -0,0 +1,51 @@
+package com.alibaba.otter.canal.client.adapter.es.test;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SqlParser;
+
+public class SqlParseTest {
+
+    @Test
+    public void parseTest() {
+        // String sql = "select a.id,d.user_id2, concat(name,'_', a.nick) as name,
+        // b.name as roleNme, d.name as typeName,concat(d.label,'_') as label,"
+        // + " c.name as refName from user a left join role b on b.user_id=a.id "
+        // + "left join type d on d.user_id=a.id and d.user_id2=a.p_id "
+        // + "left join ( select ref_id,group_concat(name,',') as name from role group
+        // by ref_id ) c on c.ref_id=a.id "
+        // + "where a.id=1";
+        String sql = "select a.id, concat(a.name,'_test') as name, a.role_id, b.name as role_name, c.labels from user a "
+                     + "left join role b on a.role_id=b.id "
+                     + "left join (select user_id, group_concat(label,',') as labels from user_label "
+                     + "group by user_id) c on c.user_id=a.id";
+        SchemaItem schemaItem = SqlParser.parse(sql);
+
+        // 通过表名找 TableItem
+        List<TableItem> tableItems = schemaItem.getTableItemAliases().get("user_label".toLowerCase());
+        tableItems.forEach(tableItem -> Assert.assertEquals("c", tableItem.getAlias()));
+
+        TableItem tableItem = tableItems.get(0);
+        Assert.assertFalse(tableItem.isMain());
+        Assert.assertTrue(tableItem.isSubQuery());
+        // 通过字段名找 FieldItem
+        List<FieldItem> fieldItems = schemaItem.getColumnFields().get(tableItem.getAlias() + ".label".toLowerCase());
+        fieldItems.forEach(
+            fieldItem -> Assert.assertEquals("c.labels", fieldItem.getOwner() + "." + fieldItem.getFieldName()));
+
+        // 获取当前表关联条件字段
+        List<FieldItem> relationTableFields = tableItem.getRelationTableFields();
+        relationTableFields.forEach(fieldItem -> Assert.assertEquals("user_id", fieldItem.getColumn().getColumnName()));
+
+        // 获取关联字段在select中的对应字段
+        List<FieldItem> relationSelectFieldItem = tableItem.getRelationSelectFields();
+        relationSelectFieldItem.forEach(fieldItem -> Assert.assertEquals("c.labels",
+            fieldItem.getOwner() + "." + fieldItem.getColumn().getColumnName()));
+    }
+}

+ 1 - 0
client-adapter/pom.xml

@@ -21,6 +21,7 @@
         <module>common</module>
         <module>logger</module>
         <module>hbase</module>
+        <module>elasticsearch</module>
         <module>launcher</module>
     </modules>