Browse Source

增加对pk的支持

mcy 6 years ago
parent
commit
cfdf509f9a

+ 2 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -30,8 +30,8 @@ public class ESSyncConfig {
         if (esMapping._type == null) {
             throw new NullPointerException("esMapping._type");
         }
-        if (esMapping._id == null) {
-            throw new NullPointerException("esMapping._id");
+        if (esMapping._id == null && esMapping.getPk() == null) {
+            throw new NullPointerException("esMapping._id and esMapping.pk");
         }
         if (esMapping.sql == null) {
             throw new NullPointerException("esMapping.sql");

+ 5 - 5
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SchemaItem.java

@@ -134,11 +134,11 @@ public class SchemaItem {
     }
 
     public FieldItem getIdFieldItem(ESMapping mapping) {
-        //TODO if (mapping.get_id() != null) {
-        return getSelectFields().get(mapping.get_id());
-        // } else {
-        // return getSelectFields().get(mapping.getPk());
-        // }
+        if (mapping.get_id() != null) {
+            return getSelectFields().get(mapping.get_id());
+        } else {
+            return getSelectFields().get(mapping.getPk());
+        }
     }
 
     public static class TableItem {

+ 19 - 21
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -16,8 +16,13 @@ import javax.sql.DataSource;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -153,7 +158,7 @@ public class ESEtlService {
         return etlResult;
     }
 
-    private void processFailBulkResponse(BulkResponse bulkResponse, boolean hasParent) {
+    private void processFailBulkResponse(BulkResponse bulkResponse) {
         for (BulkItemResponse response : bulkResponse.getItems()) {
             if (!response.isFailed()) {
                 continue;
@@ -211,24 +216,17 @@ public class ESEtlService {
                                     .setSource(esFieldData));
                             }
                         } else {
-                            // TODO idVal = rs.getObject(mapping.getPk());
-                            // if (mapping.getParent() == null) {
-                            // // 删除pk对应的数据
-                            // SearchResponse response = transportClient.prepareSearch(mapping.get_index())
-                            // .setTypes(mapping.get_type())
-                            // .setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))
-                            // .get();
-                            // for (SearchHit hit : response.getHits()) {
-                            // bulkRequestBuilder.add(transportClient
-                            // .prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
-                            // }
-                            //
-                            // bulkRequestBuilder
-                            // .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type())
-                            // .setSource(esFieldData));
-                            // } else {
-                            // // ignore
-                            // }
+                            idVal = rs.getObject(mapping.getPk());
+                            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+                                .setTypes(mapping.get_type())
+                                .setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))
+                                .setSize(10000)
+                                .get();
+                            for (SearchHit hit : response.getHits()) {
+                                bulkRequestBuilder.add(
+                                    transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
+                                        .setDoc(esFieldData));
+                            }
                         }
 
                         if (bulkRequestBuilder.numberOfActions() % mapping.getCommitBatch() == 0
@@ -236,7 +234,7 @@ public class ESEtlService {
                             long esBatchBegin = System.currentTimeMillis();
                             BulkResponse rp = bulkRequestBuilder.execute().actionGet();
                             if (rp.hasFailures()) {
-                                this.processFailBulkResponse(rp, /* TODO Objects.nonNull(mapping.getParent()) */ false);
+                                this.processFailBulkResponse(rp);
                             }
 
                             if (logger.isTraceEnabled()) {
@@ -257,7 +255,7 @@ public class ESEtlService {
                         long esBatchBegin = System.currentTimeMillis();
                         BulkResponse rp = bulkRequestBuilder.execute().actionGet();
                         if (rp.hasFailures()) {
-                            this.processFailBulkResponse(rp, /* TODO Objects.nonNull(mapping.getParent()) */false);
+                            this.processFailBulkResponse(rp);
                         }
                         if (logger.isTraceEnabled()) {
                             logger.trace("全量数据批量导入最后批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",

+ 55 - 42
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -210,7 +210,7 @@ public class ESSyncService {
                 // ------主表 查询sql来更新------
                 if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
                     ESMapping mapping = config.getEsMapping();
-                    String idFieldName = mapping.get_id(); // TODO == null ? mapping.getPk() : mapping.get_id();
+                    String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
                     FieldItem idFieldItem = schemaItem.getSelectFields().get(idFieldName);
 
                     boolean idFieldSimple = true;
@@ -336,26 +336,50 @@ public class ESSyncService {
 
             // ------是主表------
             if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
-                FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);
-                // 主键为简单字段
-                if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {
-                    Object idVal = esTemplate.getValFromData(mapping,
-                        data,
-                        idFieldItem.getFieldName(),
-                        idFieldItem.getColumn().getColumnName());
-
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index(),
-                            idVal);
+                if (mapping.get_id() != null) {
+                    FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);
+                    // 主键为简单字段
+                    if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {
+                        Object idVal = esTemplate.getValFromData(mapping,
+                            data,
+                            idFieldItem.getFieldName(),
+                            idFieldItem.getColumn().getColumnName());
+
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",
+                                config.getDestination(),
+                                dml.getTable(),
+                                mapping.get_index(),
+                                idVal);
+                        }
+                        esTemplate.delete(mapping, idVal, null);
+                    } else {
+                        // ------主键带函数, 查询sql获取主键删除------
+                        // FIXME 删除时反查sql为空记录, 无法获获取 id field 值
+                        mainTableDelete(config, dml, data);
                     }
-                    esTemplate.delete(mapping, idVal);
                 } else {
-                    // ------主键带函数, 查询sql获取主键删除------
-                    mainTableDelete(config, dml, data);
+                    FieldItem pkFieldItem = schemaItem.getIdFieldItem(mapping);
+                    if (!pkFieldItem.isMethod() && !pkFieldItem.isBinaryOp()) {
+                        Map<String, Object> esFieldData = new LinkedHashMap<>();
+                        Object pkVal = esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
+
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, pk: {}",
+                                config.getDestination(),
+                                dml.getTable(),
+                                mapping.get_index(),
+                                pkVal);
+                        }
+                        esFieldData.remove(pkFieldItem.getFieldName());
+                        esFieldData.keySet().forEach(key -> esFieldData.put(key, null));
+                        esTemplate.delete(mapping, pkVal, esFieldData);
+                    } else {
+                        // ------主键带函数, 查询sql获取主键删除------
+                        mainTableDelete(config, dml, data);
+                    }
                 }
+
             }
 
             // 从表的操作
@@ -478,6 +502,15 @@ public class ESSyncService {
         }
         ESSyncUtil.sqlRS(ds, sql, rs -> {
             try {
+                Map<String, Object> esFieldData = null;
+                if (mapping.getPk() != null) {
+                    esFieldData = new LinkedHashMap<>();
+                    esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
+                    esFieldData.remove(mapping.getPk());
+                    for (String key : esFieldData.keySet()) {
+                        esFieldData.put(key, null);
+                    }
+                }
                 while (rs.next()) {
                     Object idVal = esTemplate.getIdValFromRS(mapping, rs);
 
@@ -489,7 +522,7 @@ public class ESSyncService {
                             mapping.get_index(),
                             idVal);
                     }
-                    esTemplate.delete(mapping, idVal);
+                    esTemplate.delete(mapping, idVal, esFieldData);
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -535,13 +568,7 @@ public class ESSyncService {
                 dml.getTable(),
                 mapping.get_index());
         }
-        boolean result = esTemplate.updateByQuery(config, paramsTmp, esFieldData);
-        if (!result) {
-            logger.error("Join table update es index by foreign key error, destination:{}, table: {}, index: {}",
-                config.getDestination(),
-                dml.getTable(),
-                mapping.get_index());
-        }
+        esTemplate.updateByQuery(config, paramsTmp, esFieldData);
     }
 
     /**
@@ -629,14 +656,7 @@ public class ESSyncService {
                             dml.getTable(),
                             mapping.get_index());
                     }
-                    boolean result = esTemplate.updateByQuery(config, paramsTmp, esFieldData);
-                    if (!result) {
-                        logger.error(
-                            "Join table update es index by query sql error, destination:{}, table: {}, index: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index());
-                    }
+                    esTemplate.updateByQuery(config, paramsTmp, esFieldData);
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -738,14 +758,7 @@ public class ESSyncService {
                             dml.getTable(),
                             mapping.get_index());
                     }
-                    boolean result = esTemplate.updateByQuery(config, paramsTmp, esFieldData);
-                    if (!result) {
-                        logger.error(
-                            "Join table update es index by query whole sql error, destination:{}, table: {}, index: {}",
-                            config.getDestination(),
-                            dml.getTable(),
-                            mapping.get_index());
-                    }
+                    esTemplate.updateByQuery(config, paramsTmp, esFieldData);
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);

+ 65 - 75
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -9,15 +9,18 @@ import java.util.concurrent.ConcurrentMap;
 
 import javax.sql.DataSource;
 
+import com.alibaba.fastjson.JSONObject;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,13 +39,13 @@ import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
  */
 public class ESTemplate {
 
-    private static final Logger         logger         = LoggerFactory.getLogger(ESTemplate.class);
+    private static final Logger logger         = LoggerFactory.getLogger(ESTemplate.class);
 
-    private static final int            MAX_BATCH_SIZE = 1000;
+    private static final int    MAX_BATCH_SIZE = 1000;
 
-    private TransportClient             transportClient;
+    private TransportClient     transportClient;
 
-    private volatile BulkRequestBuilder bulkRequestBuilder;
+    private BulkRequestBuilder  bulkRequestBuilder;
 
     public ESTemplate(TransportClient transportClient){
         this.transportClient = transportClient;
@@ -56,10 +59,9 @@ public class ESTemplate {
     /**
      * 插入数据
      *
-     * @param mapping
-     * @param pkVal
-     * @param esFieldData
-     * @return
+     * @param mapping 配置对象
+     * @param pkVal 主键值
+     * @param esFieldData 数据Map
      */
     public void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
@@ -71,50 +73,42 @@ public class ESTemplate {
                 getBulk().add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
                     .setSource(esFieldData));
             }
-            commitBatch();
         } else {
-            // TODO SearchResponse response =
-            // transportClient.prepareSearch(mapping.get_index())
-            // .setTypes(mapping.get_type())
-            // .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-            // .setSize(MAX_BATCH_SIZE)
-            // .get();
-            // for (SearchHit hit : response.getHits()) {
-            // bulkRequestBuilder
-            // .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(),
-            // hit.getId()));
-            // }
-            // bulkRequestBuilder
-            // .add(transportClient.prepareIndex(mapping.get_index(),
-            // mapping.get_type()).setSource(esFieldData));
+            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+                .setTypes(mapping.get_type())
+                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+                .setSize(10000)
+                .get();
+            for (SearchHit hit : response.getHits()) {
+                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
+                    .setDoc(esFieldData));
+            }
         }
-        // return commitBulkRequest(bulkRequestBuilder);
+        commitBulk();
     }
 
     /**
      * 根据主键更新数据
      *
-     * @param mapping
-     * @param pkVal
-     * @param esFieldData
-     * @return
+     * @param mapping 配置对象
+     * @param pkVal 主键值
+     * @param esFieldData 数据Map
      */
     public void update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         append4Update(mapping, pkVal, esFieldData);
-        commitBatch();
+        commitBulk();
     }
 
     /**
      * update by query
      *
-     * @param config
-     * @param paramsTmp
-     * @param esFieldData
-     * @return
+     * @param config 配置对象
+     * @param paramsTmp sql查询条件
+     * @param esFieldData 数据Map
      */
-    public boolean updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {
+    public void updateByQuery(ESSyncConfig config, Map<String, Object> paramsTmp, Map<String, Object> esFieldData) {
         if (paramsTmp.isEmpty()) {
-            return false;
+            return;
         }
         ESMapping mapping = config.getEsMapping();
         BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
@@ -133,7 +127,7 @@ public class ESTemplate {
                 while (rs.next()) {
                     Object idVal = getIdValFromRS(mapping, rs);
                     append4Update(mapping, idVal, esFieldData);
-                    commitBatch();
+                    commitBulk();
                     count++;
                 }
             } catch (Exception e) {
@@ -144,34 +138,30 @@ public class ESTemplate {
         if (logger.isTraceEnabled()) {
             logger.trace("Update ES by query effect {} records", syncCount);
         }
-        return true;
     }
 
     /**
      * 通过主键删除数据
      *
-     * @param mapping
-     * @param pkVal
-     * @return
+     * @param mapping 配置对象
+     * @param pkVal 主键值
+     * @param esFieldData 数据Map
      */
-    public void delete(ESMapping mapping, Object pkVal) {
+    public void delete(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         if (mapping.get_id() != null) {
             getBulk().add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), pkVal.toString()));
-            commitBatch();
         } else {
-            // TODO SearchResponse response =
-            // transportClient.prepareSearch(mapping.get_index())
-            // .setTypes(mapping.get_type())
-            // .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-            // .setSize(MAX_BATCH_SIZE)
-            // .get();
-            // for (SearchHit hit : response.getHits()) {
-            // bulkRequestBuilder
-            // .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(),
-            // hit.getId()));
-            // }
+            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+                .setTypes(mapping.get_type())
+                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+                .setSize(10000)
+                .get();
+            for (SearchHit hit : response.getHits()) {
+                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
+                    .setDoc(esFieldData));
+            }
         }
-        // return commitBulkRequest(bulkRequestBuilder);
+        commitBulk();
     }
 
     /**
@@ -199,9 +189,12 @@ public class ESTemplate {
     /**
      * 如果大于批量则提交批次
      */
-    private void commitBatch() {
+    private void commitBulk() {
         if (getBulk().numberOfActions() >= MAX_BATCH_SIZE) {
             commit();
+
+            JSONObject jsonObject = new JSONObject();
+            jsonObject.toJavaObject(Object.class);
         }
     }
 
@@ -209,25 +202,22 @@ public class ESTemplate {
         if (mapping.get_id() != null) {
             if (mapping.isUpsert()) {
                 getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                        .setDoc(esFieldData)
-                        .setDocAsUpsert(true));
+                    .setDoc(esFieldData)
+                    .setDocAsUpsert(true));
             } else {
                 getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                        .setDoc(esFieldData));
+                    .setDoc(esFieldData));
             }
         } else {
-            // TODO SearchResponse response =
-            // transportClient.prepareSearch(mapping.get_index())
-            // .setTypes(mapping.get_type())
-            // .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-            // .setSize(MAX_BATCH_SIZE)
-            // .get();
-            // for (SearchHit hit : response.getHits()) {
-            // bulkRequestBuilder
-            // .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(),
-            // hit.getId())
-            // .setDoc(esFieldData));
-            // }
+            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+                .setTypes(mapping.get_type())
+                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+                .setSize(10000)
+                .get();
+            for (SearchHit hit : response.getHits()) {
+                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
+                    .setDoc(esFieldData));
+            }
         }
     }
 
@@ -253,7 +243,7 @@ public class ESTemplate {
     public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,
                                   Map<String, Object> esFieldData) throws SQLException {
         SchemaItem schemaItem = mapping.getSchemaItem();
-        String idFieldName = mapping.get_id(); // TODO == null ? mapping.getPk() : mapping.get_id();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
         Object resultIdVal = null;
         for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
             Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
@@ -272,7 +262,7 @@ public class ESTemplate {
 
     public Object getIdValFromRS(ESMapping mapping, ResultSet resultSet) throws SQLException {
         SchemaItem schemaItem = mapping.getSchemaItem();
-        String idFieldName = mapping.get_id(); // TODO == null ? mapping.getPk() : mapping.get_id();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
         Object resultIdVal = null;
         for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
             Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
@@ -288,7 +278,7 @@ public class ESTemplate {
     public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet, Map<String, Object> dmlOld,
                                   Map<String, Object> esFieldData) throws SQLException {
         SchemaItem schemaItem = mapping.getSchemaItem();
-        String idFieldName = mapping.get_id(); // TODO == null ? mapping.getPk() : mapping.get_id();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
         Object resultIdVal = null;
         for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
             if (fieldItem.getFieldName().equals(idFieldName)) {
@@ -335,7 +325,7 @@ public class ESTemplate {
     public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData,
                                        Map<String, Object> esFieldData) {
         SchemaItem schemaItem = mapping.getSchemaItem();
-        String idFieldName = mapping.get_id(); // TODO == null ? mapping.getPk() : mapping.get_id();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
         Object resultIdVal = null;
         for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
             String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();
@@ -364,7 +354,7 @@ public class ESTemplate {
     public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData, Map<String, Object> dmlOld,
                                        Map<String, Object> esFieldData) {
         SchemaItem schemaItem = mapping.getSchemaItem();
-        String idFieldName = mapping.get_id(); // TODO == null ? mapping.getPk() : mapping.get_id();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
         Object resultIdVal = null;
         for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
             String columnName = fieldItem.getColumnItems().iterator().next().getColumnName();