Browse Source

etl实现

mcy 6 years ago
parent
commit
650446de5b

+ 66 - 6
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -1,9 +1,13 @@
 package com.alibaba.otter.canal.client.adapter.es;
 
 import java.net.InetAddress;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.sql.DataSource;
+
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
@@ -12,13 +16,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
+import com.alibaba.otter.canal.client.adapter.es.service.ESEtlService;
 import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.EtlResult;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.*;
 
 /**
  * ES外部适配器
@@ -73,12 +77,64 @@ public class ESAdapter implements OuterAdapter {
 
     @Override
     public EtlResult etl(String task, List<String> params) {
-        return null;
+        EtlResult etlResult = new EtlResult();
+        ESSyncConfig config = ESSyncConfigLoader.getEsSyncConfig().get(task);
+        if (config != null) {
+            DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+            ESEtlService esEtlService = new ESEtlService(transportClient, config);
+            if (dataSource != null) {
+                return esEtlService.importData(params, false);
+            } else {
+                etlResult.setSucceeded(false);
+                etlResult.setErrorMessage("DataSource not found");
+                return etlResult;
+            }
+        } else {
+            StringBuilder resultMsg = new StringBuilder();
+            boolean resSuccess = true;
+            // ds不为空说明传入的是datasourceKey
+            for (ESSyncConfig configTmp : ESSyncConfigLoader.getEsSyncConfig().values()) {
+                // 取所有的destination为task的配置
+                if (configTmp.getDestination().equals(task)) {
+                    ESEtlService esEtlService = new ESEtlService(transportClient, configTmp);
+                    EtlResult etlRes = esEtlService.importData(params, false);
+                    if (!etlRes.getSucceeded()) {
+                        resSuccess = false;
+                        resultMsg.append(etlRes.getErrorMessage()).append("\n");
+                    } else {
+                        resultMsg.append(etlRes.getResultMessage()).append("\n");
+                    }
+                }
+            }
+            if (resultMsg.length() > 0) {
+                etlResult.setSucceeded(resSuccess);
+                if (resSuccess) {
+                    etlResult.setResultMessage(resultMsg.toString());
+                } else {
+                    etlResult.setErrorMessage(resultMsg.toString());
+                }
+                return etlResult;
+            }
+        }
+        etlResult.setSucceeded(false);
+        etlResult.setErrorMessage("Task not found");
+        return etlResult;
     }
 
     @Override
     public Map<String, Object> count(String task) {
-        return null;
+        ESSyncConfig config = ESSyncConfigLoader.getEsSyncConfig().get(task);
+        ESMapping mapping = config.getEsMapping();
+        SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+            .setTypes(mapping.get_type())
+            .setSize(0)
+            .get();
+
+        long rowCount = response.getHits().getTotalHits();
+        Map<String, Object> res = new LinkedHashMap<>();
+        res.put("esIndex", mapping.get_index());
+        res.put("count", rowCount);
+        return res;
     }
 
     @Override
@@ -90,6 +146,10 @@ public class ESAdapter implements OuterAdapter {
 
     @Override
     public String getDestination(String task) {
+        ESSyncConfig config = ESSyncConfigLoader.getEsSyncConfig().get(task);
+        if (config != null) {
+            return config.getDestination();
+        }
         return null;
     }
 }

+ 14 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESEtlService.java

@@ -13,8 +13,11 @@ import javax.sql.DataSource;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -204,8 +207,18 @@ public class ESEtlService {
                                 // ignore
                             }
                         } else {
+                            idVal = rs.getObject(mapping.getPk());
                             if (mapping.getParent() == null) {
-                                // TODO 删除pk对应的数据
+                                // 删除pk对应的数据
+                                SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+                                    .setTypes(mapping.get_type())
+                                    .setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))
+                                    .get();
+                                for (SearchHit hit : response.getHits()) {
+                                    bulkRequestBuilder.add(transportClient
+                                        .prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
+                                }
+
                                 bulkRequestBuilder
                                     .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type())
                                         .setSource(esFieldData));

+ 1 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -40,7 +40,7 @@ public class ESTemplate {
 
     private static final Logger logger         = LoggerFactory.getLogger(ESTemplate.class);
 
-    private static final int    MAX_BATCH_SIZE = 1000;
+    private static final int    MAX_BATCH_SIZE = 10000;
 
     private TransportClient     transportClient;
 

+ 0 - 2
client-adapter/elasticsearch/src/main/resources/es/mytest_user.yml

@@ -10,7 +10,5 @@ esMapping:
         left join role b on b.id=a.role_id
         left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
         group by user_id) c on c.user_id=a.id"
-#  syncByTimestamp: false
-#  syncInterval: 2000 # ms
   etlCondition: "where a.c_time>='{0}'"
   commitBatch: 3000