Browse Source

Merge branch 'feature/es2'

mcy 6 years ago
parent
commit
57c3226e56

+ 7 - 4
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -144,11 +144,14 @@ public class ESAdapter implements OuterAdapter {
             return;
         }
         for (Dml dml : dmls) {
-            sync(dml);
+            if (!dml.getIsDdl()) {
+                sync(dml);
+            }
         }
+        esSyncService.commit(); // 批次统一提交
     }
 
-    public void sync(Dml dml) {
+    private void sync(Dml dml) {
         String database = dml.getDatabase();
         String table = dml.getTable();
         Map<String, ESSyncConfig> configMap;
@@ -174,7 +177,7 @@ public class ESAdapter implements OuterAdapter {
             DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
             ESEtlService esEtlService = new ESEtlService(transportClient, config);
             if (dataSource != null) {
-                return esEtlService.importData(params, false);
+                return esEtlService.importData(params);
             } else {
                 etlResult.setSucceeded(false);
                 etlResult.setErrorMessage("DataSource not found");
@@ -188,7 +191,7 @@ public class ESAdapter implements OuterAdapter {
                 // 取所有的destination为task的配置
                 if (configTmp.getDestination().equals(task)) {
                     ESEtlService esEtlService = new ESEtlService(transportClient, configTmp);
-                    EtlResult etlRes = esEtlService.importData(params, false);
+                    EtlResult etlRes = esEtlService.importData(params);
                     if (!etlRes.getSucceeded()) {
                         resSuccess = false;
                         resultMsg.append(etlRes.getErrorMessage()).append("\n");

+ 11 - 10
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -30,7 +30,7 @@ public class ESSyncConfig {
         if (esMapping._type == null) {
             throw new NullPointerException("esMapping._type");
         }
-        if (esMapping._id == null && esMapping.pk == null) {
+        if (esMapping._id == null && esMapping.getPk() == null) {
             throw new NullPointerException("esMapping._id and esMapping.pk");
         }
         if (esMapping.sql == null) {
@@ -83,8 +83,9 @@ public class ESSyncConfig {
         private String              _index;
         private String              _type;
         private String              _id;
+        private boolean             upsert          = false;
         private String              pk;
-        private String              parent;
+        // private String parent;
         private String              sql;
         // 对象字段, 例: objFields:
         // - _labels: array:;
@@ -121,20 +122,20 @@ public class ESSyncConfig {
             this._id = _id;
         }
 
-        public String getPk() {
-            return pk;
+        public boolean isUpsert() {
+            return upsert;
         }
 
-        public void setPk(String pk) {
-            this.pk = pk;
+        public void setUpsert(boolean upsert) {
+            this.upsert = upsert;
         }
 
-        public String getParent() {
-            return parent;
+        public String getPk() {
+            return pk;
         }
 
-        public void setParent(String parent) {
-            this.parent = parent;
+        public void setPk(String pk) {
+            this.pk = pk;
         }
 
         public Map<String, String> getObjFields() {

+ 436 - 422
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java

@@ -1,422 +1,436 @@
-package com.alibaba.otter.canal.client.adapter.es.config;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
-
-/**
- * ES 映射配置视图
- *
- * @author rewerma 2018-11-01
- * @version 1.0.0
- */
-public class SchemaItem {
-
-    private Map<String, TableItem>                aliasTableItems = new LinkedHashMap<>(); // 别名对应表名
-    private Map<String, FieldItem>                selectFields    = new LinkedHashMap<>(); // 查询字段
-    private String                                sql;
-
-    private volatile Map<String, List<TableItem>> tableItemAliases;
-    private volatile Map<String, List<FieldItem>> columnFields;
-    private volatile Boolean                      allFieldsSimple;
-
-    public void init() {
-        this.getTableItemAliases();
-        this.getColumnFields();
-        this.isAllFieldsSimple();
-        aliasTableItems.values().forEach(tableItem -> {
-            tableItem.getRelationTableFields();
-            tableItem.getRelationSelectFieldItems();
-        });
-    }
-
-    public Map<String, TableItem> getAliasTableItems() {
-        return aliasTableItems;
-    }
-
-    public void setAliasTableItems(Map<String, TableItem> aliasTableItems) {
-        this.aliasTableItems = aliasTableItems;
-    }
-
-    public String getSql() {
-        return sql;
-    }
-
-    public void setSql(String sql) {
-        this.sql = sql;
-    }
-
-    public Map<String, FieldItem> getSelectFields() {
-        return selectFields;
-    }
-
-    public void setSelectFields(Map<String, FieldItem> selectFields) {
-        this.selectFields = selectFields;
-    }
-
-    public Map<String, List<TableItem>> getTableItemAliases() {
-        if (tableItemAliases == null) {
-            synchronized (SchemaItem.class) {
-                if (tableItemAliases == null) {
-                    tableItemAliases = new LinkedHashMap<>();
-                    aliasTableItems.forEach((alias, tableItem) -> {
-                        List<TableItem> aliases = tableItemAliases
-                            .computeIfAbsent(tableItem.getTableName().toLowerCase(), k -> new ArrayList<>());
-                        aliases.add(tableItem);
-                    });
-                }
-            }
-        }
-        return tableItemAliases;
-    }
-
-    public Map<String, List<FieldItem>> getColumnFields() {
-        if (columnFields == null) {
-            synchronized (SchemaItem.class) {
-                if (columnFields == null) {
-                    columnFields = new LinkedHashMap<>();
-                    getSelectFields()
-                        .forEach((fieldName, fieldItem) -> fieldItem.getColumnItems().forEach(columnItem -> {
-                            // TableItem tableItem = getAliasTableItems().get(columnItem.getOwner());
-                            // if (!tableItem.isSubQuery()) {
-                            List<FieldItem> fieldItems = columnFields.computeIfAbsent(
-                                columnItem.getOwner() + "." + columnItem.getColumnName(),
-                                k -> new ArrayList<>());
-                            fieldItems.add(fieldItem);
-                            // } else {
-                            // tableItem.getSubQueryFields().forEach(subQueryField -> {
-                            // List<FieldItem> fieldItems = columnFields.computeIfAbsent(
-                            // columnItem.getOwner() + "." + subQueryField.getColumn().getColumnName(),
-                            // k -> new ArrayList<>());
-                            // fieldItems.add(fieldItem);
-                            // });
-                            // }
-                        }));
-                }
-            }
-        }
-        return columnFields;
-    }
-
-    public boolean isAllFieldsSimple() {
-        if (allFieldsSimple == null) {
-            synchronized (SchemaItem.class) {
-                if (allFieldsSimple == null) {
-                    allFieldsSimple = true;
-
-                    for (FieldItem fieldItem : getSelectFields().values()) {
-                        if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
-                            allFieldsSimple = false;
-                            break;
-                        }
-                    }
-                }
-            }
-        }
-
-        return allFieldsSimple;
-    }
-
-    public TableItem getMainTable() {
-        if (!aliasTableItems.isEmpty()) {
-            return aliasTableItems.values().iterator().next();
-        } else {
-            return null;
-        }
-    }
-
-    public FieldItem getIdFieldItem(ESMapping mapping) {
-        if (mapping.get_id() != null) {
-            return getSelectFields().get(mapping.get_id());
-        } else {
-            return getSelectFields().get(mapping.getPk());
-        }
-    }
-
-    public static class TableItem {
-
-        private SchemaItem                               schemaItem;
-
-        private String                                   schema;
-        private String                                   tableName;
-        private String                                   alias;
-        private String                                   subQuerySql;
-        private List<FieldItem>                          subQueryFields = new ArrayList<>();
-        private List<RelationFieldsPair>                 relationFields = new ArrayList<>();
-
-        private boolean                                  main;
-        private boolean                                  subQuery;
-
-        private volatile Map<FieldItem, List<FieldItem>> relationTableFields;               // 当前表关联条件字段对应主表查询字段
-        private volatile List<FieldItem>                 relationSelectFieldItems;          // 子表所在主表的查询字段
-
-        public TableItem(SchemaItem schemaItem){
-            this.schemaItem = schemaItem;
-        }
-
-        public SchemaItem getSchemaItem() {
-            return schemaItem;
-        }
-
-        public void setSchemaItem(SchemaItem schemaItem) {
-            this.schemaItem = schemaItem;
-        }
-
-        public String getSchema() {
-            return schema;
-        }
-
-        public void setSchema(String schema) {
-            this.schema = schema;
-        }
-
-        public String getTableName() {
-            return tableName;
-        }
-
-        public void setTableName(String tableName) {
-            this.tableName = tableName;
-        }
-
-        public String getAlias() {
-            return alias;
-        }
-
-        public void setAlias(String alias) {
-            this.alias = alias;
-        }
-
-        public String getSubQuerySql() {
-            return subQuerySql;
-        }
-
-        public void setSubQuerySql(String subQuerySql) {
-            this.subQuerySql = subQuerySql;
-        }
-
-        public boolean isMain() {
-            return main;
-        }
-
-        public void setMain(boolean main) {
-            this.main = main;
-        }
-
-        public boolean isSubQuery() {
-            return subQuery;
-        }
-
-        public void setSubQuery(boolean subQuery) {
-            this.subQuery = subQuery;
-        }
-
-        public List<FieldItem> getSubQueryFields() {
-            return subQueryFields;
-        }
-
-        public void setSubQueryFields(List<FieldItem> subQueryFields) {
-            this.subQueryFields = subQueryFields;
-        }
-
-        public List<RelationFieldsPair> getRelationFields() {
-            return relationFields;
-        }
-
-        public void setRelationFields(List<RelationFieldsPair> relationFields) {
-            this.relationFields = relationFields;
-        }
-
-        public Map<FieldItem, List<FieldItem>> getRelationTableFields() {
-            if (relationTableFields == null) {
-                synchronized (SchemaItem.class) {
-                    if (relationTableFields == null) {
-                        relationTableFields = new LinkedHashMap<>();
-
-                        getRelationFields().forEach(relationFieldsPair -> {
-                            FieldItem leftFieldItem = relationFieldsPair.getLeftFieldItem();
-                            FieldItem rightFieldItem = relationFieldsPair.getRightFieldItem();
-                            FieldItem currentTableRelField = null;
-                            if (getAlias().equals(leftFieldItem.getOwner())) {
-                                currentTableRelField = leftFieldItem;
-                            } else if (getAlias().equals(rightFieldItem.getOwner())) {
-                                currentTableRelField = rightFieldItem;
-                            }
-
-                            if (currentTableRelField != null) {
-                                List<FieldItem> selectFieldItem = getSchemaItem().getColumnFields()
-                                    .get(leftFieldItem.getOwner() + "." + leftFieldItem.getColumn().getColumnName());
-                                if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
-                                    relationTableFields.put(currentTableRelField, selectFieldItem);
-                                } else {
-                                    selectFieldItem = getSchemaItem().getColumnFields()
-                                        .get(rightFieldItem.getOwner() + "."
-                                             + rightFieldItem.getColumn().getColumnName());
-                                    if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
-                                        relationTableFields.put(currentTableRelField, selectFieldItem);
-                                    } else {
-                                        throw new UnsupportedOperationException(
-                                            "Relation condition column must in select columns.");
-                                    }
-                                }
-                            }
-                        });
-                    }
-                }
-            }
-            return relationTableFields;
-        }
-
-        public List<FieldItem> getRelationSelectFieldItems() {
-            if (relationSelectFieldItems == null) {
-                synchronized (SchemaItem.class) {
-                    if (relationSelectFieldItems == null) {
-                        relationSelectFieldItems = new ArrayList<>();
-                        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
-                            if (fieldItem.getOwners().contains(getAlias())) {
-                                relationSelectFieldItems.add(fieldItem);
-                            }
-                        }
-                    }
-                }
-            }
-            return relationSelectFieldItems;
-        }
-    }
-
-    public static class RelationFieldsPair {
-
-        private FieldItem leftFieldItem;
-        private FieldItem rightFieldItem;
-
-        public RelationFieldsPair(FieldItem leftFieldItem, FieldItem rightFieldItem){
-            this.leftFieldItem = leftFieldItem;
-            this.rightFieldItem = rightFieldItem;
-        }
-
-        public FieldItem getLeftFieldItem() {
-            return leftFieldItem;
-        }
-
-        public void setLeftFieldItem(FieldItem leftFieldItem) {
-            this.leftFieldItem = leftFieldItem;
-        }
-
-        public FieldItem getRightFieldItem() {
-            return rightFieldItem;
-        }
-
-        public void setRightFieldItem(FieldItem rightFieldItem) {
-            this.rightFieldItem = rightFieldItem;
-        }
-    }
-
-    public static class FieldItem {
-
-        private String           fieldName;
-        private List<ColumnItem> columnItems = new ArrayList<>();
-        private List<String>     owners      = new ArrayList<>();
-
-        private boolean          method;
-        private boolean          binaryOp;
-
-        public String getFieldName() {
-            return fieldName;
-        }
-
-        public void setFieldName(String fieldName) {
-            this.fieldName = fieldName;
-        }
-
-        public List<ColumnItem> getColumnItems() {
-            return columnItems;
-        }
-
-        public void setColumnItems(List<ColumnItem> columnItems) {
-            this.columnItems = columnItems;
-        }
-
-        public boolean isMethod() {
-            return method;
-        }
-
-        public void setMethod(boolean method) {
-            this.method = method;
-        }
-
-        public boolean isBinaryOp() {
-            return binaryOp;
-        }
-
-        public void setBinaryOp(boolean binaryOp) {
-            this.binaryOp = binaryOp;
-        }
-
-        public List<String> getOwners() {
-            return owners;
-        }
-
-        public void setOwners(List<String> owners) {
-            this.owners = owners;
-        }
-
-        public void addColumn(ColumnItem columnItem) {
-            columnItems.add(columnItem);
-        }
-
-        public ColumnItem getColumn() {
-            if (!columnItems.isEmpty()) {
-                return columnItems.get(0);
-            } else {
-                return null;
-            }
-        }
-
-        public String getOwner() {
-            if (!owners.isEmpty()) {
-                return owners.get(0);
-            } else {
-                return null;
-            }
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            FieldItem fieldItem = (FieldItem) o;
-
-            return fieldName != null ? fieldName.equals(fieldItem.fieldName) : fieldItem.fieldName == null;
-        }
-
-        @Override
-        public int hashCode() {
-            return fieldName != null ? fieldName.hashCode() : 0;
-        }
-    }
-
-    public static class ColumnItem {
-
-        private String owner;
-        private String columnName;
-
-        public String getOwner() {
-            return owner;
-        }
-
-        public void setOwner(String owner) {
-            this.owner = owner;
-        }
-
-        public String getColumnName() {
-            return columnName;
-        }
-
-        public void setColumnName(String columnName) {
-            this.columnName = columnName;
-        }
-    }
-}
+package com.alibaba.otter.canal.client.adapter.es.config;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
+
+/**
+ * ES 映射配置视图
+ *
+ * @author rewerma 2018-11-01
+ * @version 1.0.0
+ */
+public class SchemaItem {
+
+    private Map<String, TableItem>                aliasTableItems = new LinkedHashMap<>(); // 别名对应表名
+    private Map<String, FieldItem>                selectFields    = new LinkedHashMap<>(); // 查询字段
+    private String                                sql;
+
+    private volatile Map<String, List<TableItem>> tableItemAliases;
+    private volatile Map<String, List<FieldItem>> columnFields;
+    private volatile Boolean                      allFieldsSimple;
+
+    public void init() {
+        this.getTableItemAliases();
+        this.getColumnFields();
+        this.isAllFieldsSimple();
+        aliasTableItems.values().forEach(tableItem -> {
+            tableItem.getRelationTableFields();
+            tableItem.getRelationSelectFieldItems();
+        });
+    }
+
+    public Map<String, TableItem> getAliasTableItems() {
+        return aliasTableItems;
+    }
+
+    public void setAliasTableItems(Map<String, TableItem> aliasTableItems) {
+        this.aliasTableItems = aliasTableItems;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
+    public Map<String, FieldItem> getSelectFields() {
+        return selectFields;
+    }
+
+    public void setSelectFields(Map<String, FieldItem> selectFields) {
+        this.selectFields = selectFields;
+    }
+
+    public String toSql() {
+        // todo
+        return null;
+    }
+
+    public Map<String, List<TableItem>> getTableItemAliases() {
+        if (tableItemAliases == null) {
+            synchronized (SchemaItem.class) {
+                if (tableItemAliases == null) {
+                    tableItemAliases = new LinkedHashMap<>();
+                    aliasTableItems.forEach((alias, tableItem) -> {
+                        List<TableItem> aliases = tableItemAliases
+                            .computeIfAbsent(tableItem.getTableName().toLowerCase(), k -> new ArrayList<>());
+                        aliases.add(tableItem);
+                    });
+                }
+            }
+        }
+        return tableItemAliases;
+    }
+
+    public Map<String, List<FieldItem>> getColumnFields() {
+        if (columnFields == null) {
+            synchronized (SchemaItem.class) {
+                if (columnFields == null) {
+                    columnFields = new LinkedHashMap<>();
+                    getSelectFields()
+                        .forEach((fieldName, fieldItem) -> fieldItem.getColumnItems().forEach(columnItem -> {
+                            // TableItem tableItem = getAliasTableItems().get(columnItem.getOwner());
+                            // if (!tableItem.isSubQuery()) {
+                            List<FieldItem> fieldItems = columnFields.computeIfAbsent(
+                                columnItem.getOwner() + "." + columnItem.getColumnName(),
+                                k -> new ArrayList<>());
+                            fieldItems.add(fieldItem);
+                            // } else {
+                            // tableItem.getSubQueryFields().forEach(subQueryField -> {
+                            // List<FieldItem> fieldItems = columnFields.computeIfAbsent(
+                            // columnItem.getOwner() + "." + subQueryField.getColumn().getColumnName(),
+                            // k -> new ArrayList<>());
+                            // fieldItems.add(fieldItem);
+                            // });
+                            // }
+                        }));
+                }
+            }
+        }
+        return columnFields;
+    }
+
+    public boolean isAllFieldsSimple() {
+        if (allFieldsSimple == null) {
+            synchronized (SchemaItem.class) {
+                if (allFieldsSimple == null) {
+                    allFieldsSimple = true;
+
+                    for (FieldItem fieldItem : getSelectFields().values()) {
+                        if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
+                            allFieldsSimple = false;
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        return allFieldsSimple;
+    }
+
+    public TableItem getMainTable() {
+        if (!aliasTableItems.isEmpty()) {
+            return aliasTableItems.values().iterator().next();
+        } else {
+            return null;
+        }
+    }
+
+    public FieldItem getIdFieldItem(ESMapping mapping) {
+        if (mapping.get_id() != null) {
+            return getSelectFields().get(mapping.get_id());
+        } else {
+            return getSelectFields().get(mapping.getPk());
+        }
+    }
+
+    public static class TableItem {
+
+        private SchemaItem                               schemaItem;
+
+        private String                                   schema;
+        private String                                   tableName;
+        private String                                   alias;
+        private String                                   subQuerySql;
+        private List<FieldItem>                          subQueryFields = new ArrayList<>();
+        private List<RelationFieldsPair>                 relationFields = new ArrayList<>();
+
+        private boolean                                  main;
+        private boolean                                  subQuery;
+
+        private volatile Map<FieldItem, List<FieldItem>> relationTableFields;               // 当前表关联条件字段对应主表查询字段
+        private volatile List<FieldItem>                 relationSelectFieldItems;          // 子表所在主表的查询字段
+
+        public TableItem(SchemaItem schemaItem){
+            this.schemaItem = schemaItem;
+        }
+
+        public SchemaItem getSchemaItem() {
+            return schemaItem;
+        }
+
+        public void setSchemaItem(SchemaItem schemaItem) {
+            this.schemaItem = schemaItem;
+        }
+
+        public String getSchema() {
+            return schema;
+        }
+
+        public void setSchema(String schema) {
+            this.schema = schema;
+        }
+
+        public String getTableName() {
+            return tableName;
+        }
+
+        public void setTableName(String tableName) {
+            this.tableName = tableName;
+        }
+
+        public String getAlias() {
+            return alias;
+        }
+
+        public void setAlias(String alias) {
+            this.alias = alias;
+        }
+
+        public String getSubQuerySql() {
+            return subQuerySql;
+        }
+
+        public void setSubQuerySql(String subQuerySql) {
+            this.subQuerySql = subQuerySql;
+        }
+
+        public boolean isMain() {
+            return main;
+        }
+
+        public void setMain(boolean main) {
+            this.main = main;
+        }
+
+        public boolean isSubQuery() {
+            return subQuery;
+        }
+
+        public void setSubQuery(boolean subQuery) {
+            this.subQuery = subQuery;
+        }
+
+        public List<FieldItem> getSubQueryFields() {
+            return subQueryFields;
+        }
+
+        public void setSubQueryFields(List<FieldItem> subQueryFields) {
+            this.subQueryFields = subQueryFields;
+        }
+
+        public List<RelationFieldsPair> getRelationFields() {
+            return relationFields;
+        }
+
+        public void setRelationFields(List<RelationFieldsPair> relationFields) {
+            this.relationFields = relationFields;
+        }
+
+        public Map<FieldItem, List<FieldItem>> getRelationTableFields() {
+            if (relationTableFields == null) {
+                synchronized (SchemaItem.class) {
+                    if (relationTableFields == null) {
+                        relationTableFields = new LinkedHashMap<>();
+
+                        getRelationFields().forEach(relationFieldsPair -> {
+                            FieldItem leftFieldItem = relationFieldsPair.getLeftFieldItem();
+                            FieldItem rightFieldItem = relationFieldsPair.getRightFieldItem();
+                            FieldItem currentTableRelField = null;
+                            if (getAlias().equals(leftFieldItem.getOwner())) {
+                                currentTableRelField = leftFieldItem;
+                            } else if (getAlias().equals(rightFieldItem.getOwner())) {
+                                currentTableRelField = rightFieldItem;
+                            }
+
+                            if (currentTableRelField != null) {
+                                List<FieldItem> selectFieldItem = getSchemaItem().getColumnFields()
+                                    .get(leftFieldItem.getOwner() + "." + leftFieldItem.getColumn().getColumnName());
+                                if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
+                                    relationTableFields.put(currentTableRelField, selectFieldItem);
+                                } else {
+                                    selectFieldItem = getSchemaItem().getColumnFields()
+                                        .get(rightFieldItem.getOwner() + "."
+                                             + rightFieldItem.getColumn().getColumnName());
+                                    if (selectFieldItem != null && !selectFieldItem.isEmpty()) {
+                                        relationTableFields.put(currentTableRelField, selectFieldItem);
+                                    } else {
+                                        throw new UnsupportedOperationException(
+                                            "Relation condition column must in select columns.");
+                                    }
+                                }
+                            }
+                        });
+                    }
+                }
+            }
+            return relationTableFields;
+        }
+
+        public List<FieldItem> getRelationSelectFieldItems() {
+            if (relationSelectFieldItems == null) {
+                synchronized (SchemaItem.class) {
+                    if (relationSelectFieldItems == null) {
+                        relationSelectFieldItems = new ArrayList<>();
+                        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+                            if (fieldItem.getOwners().contains(getAlias())) {
+                                relationSelectFieldItems.add(fieldItem);
+                            }
+                        }
+                    }
+                }
+            }
+            return relationSelectFieldItems;
+        }
+    }
+
+    public static class RelationFieldsPair {
+
+        private FieldItem leftFieldItem;
+        private FieldItem rightFieldItem;
+
+        public RelationFieldsPair(FieldItem leftFieldItem, FieldItem rightFieldItem){
+            this.leftFieldItem = leftFieldItem;
+            this.rightFieldItem = rightFieldItem;
+        }
+
+        public FieldItem getLeftFieldItem() {
+            return leftFieldItem;
+        }
+
+        public void setLeftFieldItem(FieldItem leftFieldItem) {
+            this.leftFieldItem = leftFieldItem;
+        }
+
+        public FieldItem getRightFieldItem() {
+            return rightFieldItem;
+        }
+
+        public void setRightFieldItem(FieldItem rightFieldItem) {
+            this.rightFieldItem = rightFieldItem;
+        }
+    }
+
+    public static class FieldItem {
+
+        private String           fieldName;
+        private String           expr;
+        private List<ColumnItem> columnItems = new ArrayList<>();
+        private List<String>     owners      = new ArrayList<>();
+
+        private boolean          method;
+        private boolean          binaryOp;
+
+        public String getFieldName() {
+            return fieldName;
+        }
+
+        public void setFieldName(String fieldName) {
+            this.fieldName = fieldName;
+        }
+
+        public String getExpr() {
+            return expr;
+        }
+
+        public void setExpr(String expr) {
+            this.expr = expr;
+        }
+
+        public List<ColumnItem> getColumnItems() {
+            return columnItems;
+        }
+
+        public void setColumnItems(List<ColumnItem> columnItems) {
+            this.columnItems = columnItems;
+        }
+
+        public boolean isMethod() {
+            return method;
+        }
+
+        public void setMethod(boolean method) {
+            this.method = method;
+        }
+
+        public boolean isBinaryOp() {
+            return binaryOp;
+        }
+
+        public void setBinaryOp(boolean binaryOp) {
+            this.binaryOp = binaryOp;
+        }
+
+        public List<String> getOwners() {
+            return owners;
+        }
+
+        public void setOwners(List<String> owners) {
+            this.owners = owners;
+        }
+
+        public void addColumn(ColumnItem columnItem) {
+            columnItems.add(columnItem);
+        }
+
+        public ColumnItem getColumn() {
+            if (!columnItems.isEmpty()) {
+                return columnItems.get(0);
+            } else {
+                return null;
+            }
+        }
+
+        public String getOwner() {
+            if (!owners.isEmpty()) {
+                return owners.get(0);
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            FieldItem fieldItem = (FieldItem) o;
+
+            return fieldName != null ? fieldName.equals(fieldItem.fieldName) : fieldItem.fieldName == null;
+        }
+
+        @Override
+        public int hashCode() {
+            return fieldName != null ? fieldName.hashCode() : 0;
+        }
+    }
+
+    public static class ColumnItem {
+
+        private String owner;
+        private String columnName;
+
+        public String getOwner() {
+            return owner;
+        }
+
+        public void setOwner(String owner) {
+            this.owner = owner;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public void setColumnName(String columnName) {
+            this.columnName = columnName;
+        }
+    }
+}

+ 9 - 6
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SqlParser.java

@@ -25,7 +25,7 @@ import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
 
 /**
  * ES同步指定sql格式解析
- * 
+ *
  * @author rewerma 2018-10-26 下午03:45:49
  * @version 1.0.0
  */
@@ -33,7 +33,7 @@ public class SqlParser {
 
     /**
      * 解析sql
-     * 
+     *
      * @param sql sql
      * @return 视图对象
      */
@@ -66,7 +66,7 @@ public class SqlParser {
 
     /**
      * 归集字段
-     * 
+     *
      * @param sqlSelectQueryBlock sqlSelectQueryBlock
      * @return 字段属性列表
      */
@@ -74,6 +74,7 @@ public class SqlParser {
         return sqlSelectQueryBlock.getSelectList().stream().map(selectItem -> {
             FieldItem fieldItem = new FieldItem();
             fieldItem.setFieldName(selectItem.getAlias());
+            fieldItem.setExpr(selectItem.toString());
             visitColumn(selectItem.getExpr(), fieldItem);
             return fieldItem;
         }).collect(Collectors.toList());
@@ -81,7 +82,7 @@ public class SqlParser {
 
     /**
      * 解析字段
-     * 
+     *
      * @param expr sql expr
      * @param fieldItem 字段属性
      */
@@ -91,6 +92,7 @@ public class SqlParser {
             SQLIdentifierExpr identifierExpr = (SQLIdentifierExpr) expr;
             if (fieldItem.getFieldName() == null) {
                 fieldItem.setFieldName(identifierExpr.getName());
+                fieldItem.setExpr(identifierExpr.toString());
             }
             ColumnItem columnItem = new ColumnItem();
             columnItem.setColumnName(identifierExpr.getName());
@@ -101,6 +103,7 @@ public class SqlParser {
             SQLPropertyExpr sqlPropertyExpr = (SQLPropertyExpr) expr;
             if (fieldItem.getFieldName() == null) {
                 fieldItem.setFieldName(sqlPropertyExpr.getName());
+                fieldItem.setExpr(sqlPropertyExpr.toString());
             }
             fieldItem.getOwners().add(sqlPropertyExpr.getOwnernName());
             ColumnItem columnItem = new ColumnItem();
@@ -123,7 +126,7 @@ public class SqlParser {
 
     /**
      * 解析表
-     * 
+     *
      * @param schemaItem 视图对象
      * @param sqlTableSource sqlTableSource
      * @param tableItems 表对象列表
@@ -178,7 +181,7 @@ public class SqlParser {
 
     /**
      * 解析on条件
-     * 
+     *
      * @param expr sql expr
      * @param tableItem 表对象
      */

+ 66 - 70
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -1,6 +1,9 @@
 package com.alibaba.otter.canal.client.adapter.es.service;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -14,7 +17,9 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
@@ -51,7 +56,7 @@ public class ESEtlService {
         this.config = config;
     }
 
-    public EtlResult importData(List<String> params, boolean bulk) {
+    public EtlResult importData(List<String> params) {
         EtlResult etlResult = new EtlResult();
         AtomicLong impCount = new AtomicLong();
         List<String> errMsg = new ArrayList<>();
@@ -93,54 +98,49 @@ public class ESEtlService {
                 logger.debug("etl sql : {}", mapping.getSql());
             }
 
-            if (bulk) {
-                // 获取总数
-                String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
-                long cnt = (Long) ESSyncUtil.sqlRS(dataSource, countSql, rs -> {
-                    Long count = null;
-                    try {
-                        if (rs.next()) {
-                            count = ((Number) rs.getObject(1)).longValue();
-                        }
-                    } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
-                    }
-                    return count == null ? 0L : count;
-                });
-
-                // 当大于1万条记录时开启多线程
-                if (cnt >= 10000) {
-                    int threadCount = 3; // TODO 从配置读取默认为3
-                    long perThreadCnt = cnt / threadCount;
-                    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
-                    List<Future<Boolean>> futures = new ArrayList<>(threadCount);
-                    for (int i = 0; i < threadCount; i++) {
-                        long offset = i * perThreadCnt;
-                        Long size = null;
-                        if (i != threadCount - 1) {
-                            size = perThreadCnt;
-                        }
-                        String sqlFinal;
-                        if (size != null) {
-                            sqlFinal = sql + " LIMIT " + offset + "," + size;
-                        } else {
-                            sqlFinal = sql + " LIMIT " + offset + "," + cnt;
-                        }
-                        Future<Boolean> future = executor
-                            .submit(() -> executeSqlImport(dataSource, sqlFinal, mapping, impCount, errMsg));
-                        futures.add(future);
+            // 获取总数
+            String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
+            long cnt = (Long) ESSyncUtil.sqlRS(dataSource, countSql, rs -> {
+                Long count = null;
+                try {
+                    if (rs.next()) {
+                        count = ((Number) rs.getObject(1)).longValue();
                     }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+                return count == null ? 0L : count;
+            });
 
-                    for (Future<Boolean> future : futures) {
-                        future.get();
+            // 当大于1万条记录时开启多线程
+            if (cnt >= 10000) {
+                int threadCount = 3; // 从配置读取默认为3
+                long perThreadCnt = cnt / threadCount;
+                ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+                List<Future<Boolean>> futures = new ArrayList<>(threadCount);
+                for (int i = 0; i < threadCount; i++) {
+                    long offset = i * perThreadCnt;
+                    Long size = null;
+                    if (i != threadCount - 1) {
+                        size = perThreadCnt;
+                    }
+                    String sqlFinal;
+                    if (size != null) {
+                        sqlFinal = sql + " LIMIT " + offset + "," + size;
+                    } else {
+                        sqlFinal = sql + " LIMIT " + offset + "," + cnt;
                     }
+                    Future<Boolean> future = executor
+                        .submit(() -> executeSqlImport(dataSource, sqlFinal, mapping, impCount, errMsg));
+                    futures.add(future);
+                }
 
-                    executor.shutdown();
-                } else {
-                    executeSqlImport(dataSource, sql, mapping, impCount, errMsg);
+                for (Future<Boolean> future : futures) {
+                    future.get();
                 }
+
+                executor.shutdown();
             } else {
-                logger.info("自动ETL,无需统计记录总条数,直接进行ETL, index: {}", esIndex);
                 executeSqlImport(dataSource, sql, mapping, impCount, errMsg);
             }
 
@@ -158,7 +158,7 @@ public class ESEtlService {
         return etlResult;
     }
 
-    private void processFailBulkResponse(BulkResponse bulkResponse, boolean hasParent) {
+    private void processFailBulkResponse(BulkResponse bulkResponse) {
         for (BulkItemResponse response : bulkResponse.getItems()) {
             if (!response.isFailed()) {
                 continue;
@@ -205,31 +205,27 @@ public class ESEtlService {
                         }
 
                         if (idVal != null) {
-                            if (mapping.getParent() == null) {
+                            if (mapping.isUpsert()) {
+                                bulkRequestBuilder.add(transportClient
+                                    .prepareUpdate(mapping.get_index(), mapping.get_type(), idVal.toString())
+                                    .setDoc(esFieldData)
+                                    .setDocAsUpsert(true));
+                            } else {
                                 bulkRequestBuilder.add(transportClient
                                     .prepareIndex(mapping.get_index(), mapping.get_type(), idVal.toString())
                                     .setSource(esFieldData));
-                            } else {
-                                // ignore
                             }
                         } else {
                             idVal = rs.getObject(mapping.getPk());
-                            if (mapping.getParent() == null) {
-                                // 删除pk对应的数据
-                                SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-                                    .setTypes(mapping.get_type())
-                                    .setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))
-                                    .get();
-                                for (SearchHit hit : response.getHits()) {
-                                    bulkRequestBuilder.add(transportClient
-                                        .prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
-                                }
-
-                                bulkRequestBuilder
-                                    .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type())
-                                        .setSource(esFieldData));
-                            } else {
-                                // ignore
+                            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+                                .setTypes(mapping.get_type())
+                                .setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))
+                                .setSize(10000)
+                                .get();
+                            for (SearchHit hit : response.getHits()) {
+                                bulkRequestBuilder.add(
+                                    transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
+                                        .setDoc(esFieldData));
                             }
                         }
 
@@ -238,11 +234,11 @@ public class ESEtlService {
                             long esBatchBegin = System.currentTimeMillis();
                             BulkResponse rp = bulkRequestBuilder.execute().actionGet();
                             if (rp.hasFailures()) {
-                                this.processFailBulkResponse(rp, Objects.nonNull(mapping.getParent()));
+                                this.processFailBulkResponse(rp);
                             }
 
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("全量数据批量导入批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
+                            if (logger.isTraceEnabled()) {
+                                logger.trace("全量数据批量导入批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
                                     (System.currentTimeMillis() - batchBegin),
                                     (System.currentTimeMillis() - esBatchBegin),
                                     bulkRequestBuilder.numberOfActions(),
@@ -259,10 +255,10 @@ public class ESEtlService {
                         long esBatchBegin = System.currentTimeMillis();
                         BulkResponse rp = bulkRequestBuilder.execute().actionGet();
                         if (rp.hasFailures()) {
-                            this.processFailBulkResponse(rp, Objects.nonNull(mapping.getParent()));
+                            this.processFailBulkResponse(rp);
                         }
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("全量数据批量导入最后批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("全量数据批量导入最后批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
                                 (System.currentTimeMillis() - batchBegin),
                                 (System.currentTimeMillis() - esBatchBegin),
                                 bulkRequestBuilder.numberOfActions(),

+ 73 - 91
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -71,7 +71,12 @@ public class ESSyncService {
                     dml.getDestination());
             }
             if (logger.isDebugEnabled()) {
-                logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+                StringBuilder configIndexes = new StringBuilder();
+                esSyncConfigs
+                    .forEach(esSyncConfig -> configIndexes.append(esSyncConfig.getEsMapping().get_index()).append(" "));
+                logger.debug("DML: {} \nEffect indexes: {}",
+                    JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue),
+                    configIndexes.toString());
             }
         }
     }
@@ -92,6 +97,8 @@ public class ESSyncService {
                 update(config, dml);
             } else if (type != null && type.equalsIgnoreCase("DELETE")) {
                 delete(config, dml);
+            } else {
+                return;
             }
 
             if (logger.isTraceEnabled()) {
@@ -329,33 +336,50 @@ public class ESSyncService {
 
             // ------是主表------
             if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
-                FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);
-                // 主键为简单字段
-                if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {
-                    Object idVal = esTemplate.getValFromData(mapping,
-                        data,
-                        idFieldItem.getFieldName(),
-                        idFieldItem.getColumn().getColumnName());
-
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index(),
-                            idVal);
-                    }
-                    boolean result = esTemplate.delete(mapping, idVal);
-                    if (!result) {
-                        logger.error("Main table delete es index error, destination:{}, table: {}, index: {}, id: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index(),
-                            idVal);
+                if (mapping.get_id() != null) {
+                    FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);
+                    // 主键为简单字段
+                    if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {
+                        Object idVal = esTemplate.getValFromData(mapping,
+                            data,
+                            idFieldItem.getFieldName(),
+                            idFieldItem.getColumn().getColumnName());
+
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",
+                                config.getDestination(),
+                                dml.getTable(),
+                                mapping.get_index(),
+                                idVal);
+                        }
+                        esTemplate.delete(mapping, idVal, null);
+                    } else {
+                        // ------主键带函数, 查询sql获取主键删除------
+                        // FIXME 删除时反查sql为空记录, 无法获获取 id field 值
+                        mainTableDelete(config, dml, data);
                     }
                 } else {
-                    // ------主键带函数, 查询sql获取主键删除------
-                    mainTableDelete(config, dml, data);
+                    FieldItem pkFieldItem = schemaItem.getIdFieldItem(mapping);
+                    if (!pkFieldItem.isMethod() && !pkFieldItem.isBinaryOp()) {
+                        Map<String, Object> esFieldData = new LinkedHashMap<>();
+                        Object pkVal = esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
+
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, pk: {}",
+                                config.getDestination(),
+                                dml.getTable(),
+                                mapping.get_index(),
+                                pkVal);
+                        }
+                        esFieldData.remove(pkFieldItem.getFieldName());
+                        esFieldData.keySet().forEach(key -> esFieldData.put(key, null));
+                        esTemplate.delete(mapping, pkVal, esFieldData);
+                    } else {
+                        // ------主键带函数, 查询sql获取主键删除------
+                        mainTableDelete(config, dml, data);
+                    }
                 }
+
             }
 
             // 从表的操作
@@ -417,14 +441,7 @@ public class ESSyncService {
                 mapping.get_index(),
                 idVal);
         }
-        boolean result = esTemplate.insert(mapping, idVal, esFieldData);
-        if (!result) {
-            logger.error("Single table insert to es index error, destination:{}, table: {}, index: {}, id: {}",
-                config.getDestination(),
-                dml.getTable(),
-                mapping.get_index(),
-                idVal);
-        }
+        esTemplate.insert(mapping, idVal, esFieldData);
     }
 
     /**
@@ -461,15 +478,7 @@ public class ESSyncService {
                             mapping.get_index(),
                             idVal);
                     }
-                    boolean result = esTemplate.insert(mapping, idVal, esFieldData);
-                    if (!result) {
-                        logger.error(
-                            "Main table insert to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index(),
-                            idVal);
-                    }
+                    esTemplate.insert(mapping, idVal, esFieldData);
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -493,6 +502,15 @@ public class ESSyncService {
         }
         ESSyncUtil.sqlRS(ds, sql, rs -> {
             try {
+                Map<String, Object> esFieldData = null;
+                if (mapping.getPk() != null) {
+                    esFieldData = new LinkedHashMap<>();
+                    esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
+                    esFieldData.remove(mapping.getPk());
+                    for (String key : esFieldData.keySet()) {
+                        esFieldData.put(key, null);
+                    }
+                }
                 while (rs.next()) {
                     Object idVal = esTemplate.getIdValFromRS(mapping, rs);
 
@@ -504,15 +522,7 @@ public class ESSyncService {
                             mapping.get_index(),
                             idVal);
                     }
-                    boolean result = esTemplate.delete(mapping, idVal);
-                    if (!result) {
-                        logger.error(
-                            "Main table delete to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index(),
-                            idVal);
-                    }
+                    esTemplate.delete(mapping, idVal, esFieldData);
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -558,13 +568,7 @@ public class ESSyncService {
                 dml.getTable(),
                 mapping.get_index());
         }
-        boolean result = esTemplate.updateByQuery(config, paramsTmp, esFieldData);
-        if (!result) {
-            logger.error("Join table update es index by foreign key error, destination:{}, table: {}, index: {}",
-                config.getDestination(),
-                dml.getTable(),
-                mapping.get_index());
-        }
+        esTemplate.updateByQuery(config, paramsTmp, esFieldData);
     }
 
     /**
@@ -652,14 +656,7 @@ public class ESSyncService {
                             dml.getTable(),
                             mapping.get_index());
                     }
-                    boolean result = esTemplate.updateByQuery(config, paramsTmp, esFieldData);
-                    if (!result) {
-                        logger.error(
-                            "Join table update es index by query sql error, destination:{}, table: {}, index: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index());
-                    }
+                    esTemplate.updateByQuery(config, paramsTmp, esFieldData);
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -761,14 +758,7 @@ public class ESSyncService {
                             dml.getTable(),
                             mapping.get_index());
                     }
-                    boolean result = esTemplate.updateByQuery(config, paramsTmp, esFieldData);
-                    if (!result) {
-                        logger.error(
-                            "Join table update es index by query whole sql error, destination:{}, table: {}, index: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index());
-                    }
+                    esTemplate.updateByQuery(config, paramsTmp, esFieldData);
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -799,14 +789,7 @@ public class ESSyncService {
                 mapping.get_index(),
                 idVal);
         }
-        boolean result = esTemplate.update(mapping, idVal, esFieldData);
-        if (!result) {
-            logger.error("Main table update to es index error, destination:{}, table: {}, index: {}, id: {}",
-                config.getDestination(),
-                dml.getTable(),
-                mapping.get_index(),
-                idVal);
-        }
+        esTemplate.update(mapping, idVal, esFieldData);
     }
 
     /**
@@ -843,15 +826,7 @@ public class ESSyncService {
                             mapping.get_index(),
                             idVal);
                     }
-                    boolean result = esTemplate.update(mapping, idVal, esFieldData);
-                    if (!result) {
-                        logger.error(
-                            "Main table update to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index(),
-                            idVal);
-                    }
+                    esTemplate.update(mapping, idVal, esFieldData);
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -859,4 +834,11 @@ public class ESSyncService {
             return 0;
         });
     }
+
+    /**
+     * 提交批次
+     */
+    public void commit() {
+        esTemplate.commit();
+    }
 }

+ 100 - 202
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -2,14 +2,13 @@ package com.alibaba.otter.canal.client.adapter.es.support;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 
 import javax.sql.DataSource;
 
-import com.alibaba.fastjson.JSON;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
@@ -18,18 +17,11 @@ import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.reindex.BulkByScrollResponse;
-import org.elasticsearch.index.reindex.UpdateByQueryAction;
-import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
 import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.script.Script;
-import org.elasticsearch.script.ScriptType;
 import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.util.CollectionUtils;
 
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
@@ -52,254 +44,131 @@ public class ESTemplate {
 
     private TransportClient     transportClient;
 
+    private BulkRequestBuilder  bulkRequestBuilder;
+
     public ESTemplate(TransportClient transportClient){
         this.transportClient = transportClient;
+        this.bulkRequestBuilder = transportClient.prepareBulk();
+    }
+
+    public BulkRequestBuilder getBulk() {
+        return bulkRequestBuilder;
     }
 
     /**
      * 插入数据
      *
-     * @param mapping
-     * @param pkVal
-     * @param esFieldData
-     * @return
+     * @param mapping 配置对象
+     * @param pkVal 主键值
+     * @param esFieldData 数据Map
      */
-    public boolean insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
-        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+    public void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
-            bulkRequestBuilder
-                .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
+            if (mapping.isUpsert()) {
+                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                    .setDoc(esFieldData)
+                    .setDocAsUpsert(true));
+            } else {
+                getBulk().add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
                     .setSource(esFieldData));
+            }
         } else {
             SearchResponse response = transportClient.prepareSearch(mapping.get_index())
                 .setTypes(mapping.get_type())
                 .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-                .setSize(MAX_BATCH_SIZE)
+                .setSize(10000)
                 .get();
             for (SearchHit hit : response.getHits()) {
-                bulkRequestBuilder
-                    .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
+                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
+                    .setDoc(esFieldData));
             }
-            bulkRequestBuilder
-                .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type()).setSource(esFieldData));
         }
-        return commitBulkRequest(bulkRequestBuilder);
+        commitBulk();
     }
 
     /**
      * 根据主键更新数据
      *
-     * @param mapping
-     * @param pkVal
-     * @param esFieldData
-     * @return
+     * @param mapping 配置对象
+     * @param pkVal 主键值
+     * @param esFieldData 数据Map
      */
-    public boolean update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
-        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
-        append4Update(bulkRequestBuilder, mapping, pkVal, esFieldData);
-        return commitBulkRequest(bulkRequestBuilder);
-    }
-
-    public void append4Update(BulkRequestBuilder bulkRequestBuilder, ESMapping mapping, Object pkVal,
-                              Map<String, Object> esFieldData) {
-        if (mapping.get_id() != null) {
-            bulkRequestBuilder
-                .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                    .setDoc(esFieldData));
-        } else {
-            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-                .setTypes(mapping.get_type())
-                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-                .setSize(MAX_BATCH_SIZE)
-                .get();
-            for (SearchHit hit : response.getHits()) {
-                bulkRequestBuilder
-                    .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
-                        .setDoc(esFieldData));
-            }
-        }
+    public void update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
+        append4Update(mapping, pkVal, esFieldData);
+        commitBulk();
     }
 
     /**
      * update by query
      *
-     * @param config
-     * @param paramsTmp
-     * @param esFieldData
-     * @return
+     * @param config 配置对象
+     * @param paramsTmp sql查询条件
+     * @param esFieldData 数据Map
      */
-    public boolean updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {
+    public void updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {
         if (paramsTmp.isEmpty()) {
-            return false;
+            return;
         }
         ESMapping mapping = config.getEsMapping();
         BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
         paramsTmp.forEach((fieldName, value) -> queryBuilder.must(QueryBuilders.termsQuery(fieldName, value)));
 
-        SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-            .setTypes(mapping.get_type())
-            .setSize(0)
-            .setQuery(queryBuilder)
-            .get();
-        long count = response.getHits().getTotalHits();
-        // 如果更新量大于Max, 查询sql批量更新
-        if (count > MAX_BATCH_SIZE) {
-            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
-
-            DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
-            // 查询sql更新
-            StringBuilder sql = new StringBuilder("SELECT * FROM (" + mapping.getSql() + ") _v WHERE ");
-            paramsTmp.forEach(
-                (fieldName, value) -> sql.append("_v.").append(fieldName).append("=").append(value).append(" AND "));
-            int len = sql.length();
-            sql.delete(len - 4, len);
-            ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
-                int exeCount = 1;
-                try {
-                    BulkRequestBuilder bulkRequestBuilderTmp = bulkRequestBuilder;
-                    while (rs.next()) {
-                        Object idVal = getIdValFromRS(mapping, rs);
-                        append4Update(bulkRequestBuilderTmp, mapping, idVal, esFieldData);
-
-                        if (exeCount % mapping.getCommitBatch() == 0 && bulkRequestBuilderTmp.numberOfActions() > 0) {
-                            commitBulkRequest(bulkRequestBuilderTmp);
-                            bulkRequestBuilderTmp = transportClient.prepareBulk();
-                        }
-                        exeCount++;
-                    }
-
-                    if (bulkRequestBuilder.numberOfActions() > 0) {
-                        commitBulkRequest(bulkRequestBuilderTmp);
-                    }
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-                return 0;
-            });
-            return true;
-        } else {
-            return updateByQuery(mapping, queryBuilder, esFieldData, 1);
-        }
-    }
-
-    private boolean updateByQuery(ESMapping mapping, QueryBuilder queryBuilder, Map<String, Object> esFieldData,
-                                  int counter) {
-        if (CollectionUtils.isEmpty(esFieldData)) {
-            return true;
-        }
-
-        StringBuilder sb = new StringBuilder();
-        esFieldData.forEach((key, value) -> {
-            if (value instanceof Map) {
-                Map<?, ?> mapValue = (Map<?, ?>) value;
-                if (mapValue.containsKey("lon") && mapValue.containsKey("lat") && mapValue.size() == 2) {
-                    sb.append("ctx._source")
-                        .append("['")
-                        .append(key)
-                        .append("']")
-                        .append(" = [")
-                        .append(mapValue.get("lon"))
-                        .append(", ")
-                        .append(mapValue.get("lat"))
-                        .append("];");
-                } else {
-                    sb.append("ctx._source").append("[\"").append(key).append("\"]").append(" = ");
-                    sb.append(JSON.toJSONString(value));
-                    sb.append(";");
+        // 查询sql批量更新
+        DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+        StringBuilder sql = new StringBuilder("SELECT * FROM (" + mapping.getSql() + ") _v WHERE ");
+        paramsTmp.forEach(
+            (fieldName, value) -> sql.append("_v.").append(fieldName).append("=").append(value).append(" AND "));
+        int len = sql.length();
+        sql.delete(len - 4, len);
+        Integer syncCount = (Integer) ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
+            int count = 0;
+            try {
+                while (rs.next()) {
+                    Object idVal = getIdValFromRS(mapping, rs);
+                    append4Update(mapping, idVal, esFieldData);
+                    commitBulk();
+                    count++;
                 }
-            } else if (value instanceof List) {
-                sb.append("ctx._source").append("[\"").append(key).append("\"]").append(" = ");
-                sb.append(JSON.toJSONString(value));
-                sb.append(";");
-            } else if (value instanceof String) {
-                sb.append("ctx._source")
-                    .append("['")
-                    .append(key)
-                    .append("']")
-                    .append(" = '")
-                    .append(value)
-                    .append("';");
-            } else {
-                sb.append("ctx._source").append("['").append(key).append("']").append(" = ").append(value).append(";");
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
+            return count;
         });
-        String scriptLine = sb.toString();
         if (logger.isTraceEnabled()) {
-            logger.trace(scriptLine);
+            logger.trace("Update ES by query effect {} records", syncCount);
         }
-
-        UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(transportClient);
-        updateByQuery.source(mapping.get_index())
-            .abortOnVersionConflict(false)
-            .filter(queryBuilder)
-            .script(new Script(ScriptType.INLINE, "painless", scriptLine, Collections.emptyMap()));
-
-        BulkByScrollResponse response = updateByQuery.get();
-        if (logger.isTraceEnabled()) {
-            logger.trace("updateByQuery response: {}", response.getStatus());
-        }
-        if (!CollectionUtils.isEmpty(response.getSearchFailures())) {
-            logger.error("script update_for_search has search error: " + response.getBulkFailures());
-            return false;
-        }
-
-        if (!CollectionUtils.isEmpty(response.getBulkFailures())) {
-            logger.error("script update_for_search has update error: " + response.getBulkFailures());
-            return false;
-        }
-
-        if (response.getStatus().getVersionConflicts() > 0) {
-            if (counter >= 3) {
-                logger.error("第 {} 次执行updateByQuery, 依旧存在分片版本冲突,不再继续重试。", counter);
-                return false;
-            }
-            logger.warn("本次updateByQuery存在分片版本冲突,准备重新执行...");
-            try {
-                TimeUnit.SECONDS.sleep(1);
-            } catch (InterruptedException e) {
-                // ignore
-            }
-            return updateByQuery(mapping, queryBuilder, esFieldData, ++counter);
-        }
-
-        return true;
     }
 
     /**
      * 通过主键删除数据
      *
-     * @param mapping
-     * @param pkVal
-     * @return
+     * @param mapping 配置对象
+     * @param pkVal 主键值
+     * @param esFieldData 数据Map
      */
-    public boolean delete(ESMapping mapping, Object pkVal) {
-        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+    public void delete(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
-            bulkRequestBuilder
-                .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), pkVal.toString()));
+            getBulk().add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), pkVal.toString()));
         } else {
             SearchResponse response = transportClient.prepareSearch(mapping.get_index())
                 .setTypes(mapping.get_type())
                 .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-                .setSize(MAX_BATCH_SIZE)
+                .setSize(10000)
                 .get();
             for (SearchHit hit : response.getHits()) {
-                bulkRequestBuilder
-                    .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
+                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
+                    .setDoc(esFieldData));
             }
         }
-        return commitBulkRequest(bulkRequestBuilder);
+        commitBulk();
     }
 
     /**
-     * 批量提交
-     *
-     * @param bulkRequestBuilder
-     * @return
+     * 提交批次
      */
-    private static boolean commitBulkRequest(BulkRequestBuilder bulkRequestBuilder) {
-        if (bulkRequestBuilder.numberOfActions() > 0) {
-            BulkResponse response = bulkRequestBuilder.execute().actionGet();
+    public void commit() {
+        if (getBulk().numberOfActions() > 0) {
+            BulkResponse response = getBulk().execute().actionGet();
             if (response.hasFailures()) {
                 for (BulkItemResponse itemResponse : response.getItems()) {
                     if (!itemResponse.isFailed()) {
@@ -307,16 +176,45 @@ public class ESTemplate {
                     }
 
                     if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
-                        logger.warn(itemResponse.getFailureMessage());
+                        logger.error(itemResponse.getFailureMessage());
                     } else {
-                        logger.error("ES sync commit error: {}", itemResponse.getFailureMessage());
+                        throw new RuntimeException("ES sync commit error" + itemResponse.getFailureMessage());
                     }
                 }
             }
+        }
+    }
+
+    /**
+     * 如果大于批量则提交批次
+     */
+    private void commitBulk() {
+        if (getBulk().numberOfActions() >= MAX_BATCH_SIZE) {
+            commit();
+        }
+    }
 
-            return !response.hasFailures();
+    private void append4Update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
+        if (mapping.get_id() != null) {
+            if (mapping.isUpsert()) {
+                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                    .setDoc(esFieldData)
+                    .setDocAsUpsert(true));
+            } else {
+                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                    .setDoc(esFieldData));
+            }
+        } else {
+            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+                .setTypes(mapping.get_type())
+                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+                .setSize(10000)
+                .get();
+            for (SearchHit hit : response.getHits()) {
+                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
+                    .setDoc(esFieldData));
+            }
         }
-        return true;
     }
 
     public Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName,

+ 1 - 0
client-adapter/elasticsearch/src/main/resources/es/mytest_user.yml

@@ -5,6 +5,7 @@ esMapping:
   _index: mytest_user
   _type: _doc
   _id: _id
+  upsert: true
 #  pk: id
   sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
         a.c_time as _c_time from user a

+ 1 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/SqlParseTest.java

@@ -29,7 +29,7 @@ public class SqlParseTest {
         Assert.assertFalse(tableItem.isMain());
         Assert.assertTrue(tableItem.isSubQuery());
         // 通过字段名找 FieldItem
-        List<FieldItem> fieldItems = schemaItem.getColumnFields().get(tableItem.getAlias() + ".label".toLowerCase());
+        List<FieldItem> fieldItems = schemaItem.getColumnFields().get(tableItem.getAlias() + ".labels".toLowerCase());
         fieldItems.forEach(
             fieldItem -> Assert.assertEquals("c.labels", fieldItem.getOwner() + "." + fieldItem.getFieldName()));