|
@@ -1,5 +1,22 @@
|
|
|
package com.alibaba.otter.canal.client.adapter.es8x.support;
|
|
|
|
|
|
+import java.sql.ResultSet;
|
|
|
+import java.sql.SQLException;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
+
|
|
|
+import javax.sql.DataSource;
|
|
|
+
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
+import org.elasticsearch.action.search.SearchResponse;
|
|
|
+import org.elasticsearch.cluster.metadata.MappingMetadata;
|
|
|
+import org.elasticsearch.index.query.BoolQueryBuilder;
|
|
|
+import org.elasticsearch.index.query.QueryBuilders;
|
|
|
+import org.elasticsearch.search.SearchHit;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
|
|
|
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.ESMapping;
|
|
|
import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem;
|
|
@@ -15,37 +32,22 @@ import com.alibaba.otter.canal.client.adapter.es.core.support.ESTemplate;
|
|
|
import com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection.ESSearchRequest;
|
|
|
import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
|
|
|
import com.alibaba.otter.canal.client.adapter.support.Util;
|
|
|
-import org.apache.commons.lang.StringUtils;
|
|
|
-import org.elasticsearch.action.search.SearchResponse;
|
|
|
-import org.elasticsearch.cluster.metadata.MappingMetadata;
|
|
|
-import org.elasticsearch.index.query.BoolQueryBuilder;
|
|
|
-import org.elasticsearch.index.query.QueryBuilders;
|
|
|
-import org.elasticsearch.search.SearchHit;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-
|
|
|
-import javax.sql.DataSource;
|
|
|
-import java.sql.ResultSet;
|
|
|
-import java.sql.SQLException;
|
|
|
-import java.util.*;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
public class ES8xTemplate implements ESTemplate {
|
|
|
|
|
|
- private static final Logger logger = LoggerFactory
|
|
|
- .getLogger(ESTemplate.class);
|
|
|
+ private static final Logger logger = LoggerFactory
|
|
|
+ .getLogger(ESTemplate.class);
|
|
|
|
|
|
- private static final int MAX_BATCH_SIZE = 1000;
|
|
|
+ private static final int MAX_BATCH_SIZE = 1000;
|
|
|
|
|
|
- private ESConnection esConnection;
|
|
|
+ private ESConnection esConnection;
|
|
|
|
|
|
- private ESBulkRequest esBulkRequest;
|
|
|
+ private ESBulkRequest esBulkRequest;
|
|
|
|
|
|
// es 字段类型本地缓存
|
|
|
- private static ConcurrentMap<String, Map<String, String>> esFieldTypes = new ConcurrentHashMap<>();
|
|
|
+ private static ConcurrentMap<String, Map<String, String>> esFieldTypes = new ConcurrentHashMap<>();
|
|
|
|
|
|
- public ES8xTemplate(ESConnection esConnection) {
|
|
|
+ public ES8xTemplate(ESConnection esConnection){
|
|
|
this.esConnection = esConnection;
|
|
|
this.esBulkRequest = this.esConnection.new ES8xBulkRequest();
|
|
|
}
|
|
@@ -64,14 +66,14 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
String parentVal = (String) esFieldData.remove("$parent_routing");
|
|
|
if (mapping.isUpsert()) {
|
|
|
ESUpdateRequest updateRequest = esConnection.new ES8xUpdateRequest(mapping.get_index(),
|
|
|
- pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
|
|
|
+ pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
|
|
|
if (StringUtils.isNotEmpty(parentVal)) {
|
|
|
updateRequest.setRouting(parentVal);
|
|
|
}
|
|
|
getBulk().add(updateRequest);
|
|
|
} else {
|
|
|
ESIndexRequest indexRequest = esConnection.new ES8xIndexRequest(mapping.get_index(), pkVal.toString())
|
|
|
- .setSource(esFieldData);
|
|
|
+ .setSource(esFieldData);
|
|
|
if (StringUtils.isNotEmpty(parentVal)) {
|
|
|
indexRequest.setRouting(parentVal);
|
|
|
}
|
|
@@ -80,13 +82,13 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
commitBulk();
|
|
|
} else {
|
|
|
ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
|
|
|
- .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
|
|
|
- .size(10000);
|
|
|
+ .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
|
|
|
+ .size(10000);
|
|
|
SearchResponse response = esSearchRequest.getResponse();
|
|
|
|
|
|
for (SearchHit hit : response.getHits()) {
|
|
|
ESUpdateRequest esUpdateRequest = this.esConnection.new ES8xUpdateRequest(mapping.get_index(),
|
|
|
- hit.getId()).setDoc(esFieldData);
|
|
|
+ hit.getId()).setDoc(esFieldData);
|
|
|
getBulk().add(esUpdateRequest);
|
|
|
commitBulk();
|
|
|
}
|
|
@@ -144,17 +146,17 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
public void delete(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
|
|
|
if (mapping.get_id() != null) {
|
|
|
ESDeleteRequest esDeleteRequest = this.esConnection.new ES8xDeleteRequest(mapping.get_index(),
|
|
|
- pkVal.toString());
|
|
|
+ pkVal.toString());
|
|
|
getBulk().add(esDeleteRequest);
|
|
|
commitBulk();
|
|
|
} else {
|
|
|
ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
|
|
|
- .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
|
|
|
- .size(10000);
|
|
|
+ .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
|
|
|
+ .size(10000);
|
|
|
SearchResponse response = esSearchRequest.getResponse();
|
|
|
for (SearchHit hit : response.getHits()) {
|
|
|
ESUpdateRequest esUpdateRequest = this.esConnection.new ES8xUpdateRequest(mapping.get_index(),
|
|
|
- hit.getId()).setDoc(esFieldData);
|
|
|
+ hit.getId()).setDoc(esFieldData);
|
|
|
getBulk().add(esUpdateRequest);
|
|
|
commitBulk();
|
|
|
}
|
|
@@ -208,7 +210,7 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
}
|
|
|
|
|
|
if (!fieldItem.getFieldName().equals(mapping.get_id())
|
|
|
- && !mapping.getSkips().contains(fieldItem.getFieldName())) {
|
|
|
+ && !mapping.getSkips().contains(fieldItem.getFieldName())) {
|
|
|
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
|
|
|
}
|
|
|
}
|
|
@@ -248,9 +250,9 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
|
|
|
for (ColumnItem columnItem : fieldItem.getColumnItems()) {
|
|
|
if (dmlOld.containsKey(columnItem.getColumnName())
|
|
|
- && !mapping.getSkips().contains(fieldItem.getFieldName())) {
|
|
|
+ && !mapping.getSkips().contains(fieldItem.getFieldName())) {
|
|
|
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
|
|
|
- getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
|
|
|
+ getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -281,7 +283,8 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData, Map<String, Object> esFieldData) {
|
|
|
+ public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlData,
|
|
|
+ Map<String, Object> esFieldData) {
|
|
|
SchemaItem schemaItem = mapping.getSchemaItem();
|
|
|
String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
|
|
|
Object resultIdVal = null;
|
|
@@ -294,7 +297,7 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
}
|
|
|
|
|
|
if (!fieldItem.getFieldName().equals(mapping.get_id())
|
|
|
- && !mapping.getSkips().contains(fieldItem.getFieldName())) {
|
|
|
+ && !mapping.getSkips().contains(fieldItem.getFieldName())) {
|
|
|
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
|
|
|
}
|
|
|
}
|
|
@@ -305,8 +308,8 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public Object getESDataFromDmlData(ESMapping mapping, String owner, Map<String, Object> dmlData, Map<String, Object> dmlOld,
|
|
|
- Map<String, Object> esFieldData) {
|
|
|
+ public Object getESDataFromDmlData(ESMapping mapping, String owner, Map<String, Object> dmlData,
|
|
|
+ Map<String, Object> dmlOld, Map<String, Object> esFieldData) {
|
|
|
SchemaItem schemaItem = mapping.getSchemaItem();
|
|
|
String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
|
|
|
Object resultIdVal = null;
|
|
@@ -323,7 +326,7 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
|
|
|
if (dmlOld.containsKey(columnName) && !mapping.getSkips().contains(fieldItem.getFieldName())) {
|
|
|
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()),
|
|
|
- getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
|
|
|
+ getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -346,14 +349,14 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
String parentVal = (String) esFieldData.remove("$parent_routing");
|
|
|
if (mapping.isUpsert()) {
|
|
|
ESUpdateRequest esUpdateRequest = this.esConnection.new ES8xUpdateRequest(mapping.get_index(),
|
|
|
- pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
|
|
|
+ pkVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
|
|
|
if (StringUtils.isNotEmpty(parentVal)) {
|
|
|
esUpdateRequest.setRouting(parentVal);
|
|
|
}
|
|
|
getBulk().add(esUpdateRequest);
|
|
|
} else {
|
|
|
ESUpdateRequest esUpdateRequest = this.esConnection.new ES8xUpdateRequest(mapping.get_index(),
|
|
|
- pkVal.toString()).setDoc(esFieldData);
|
|
|
+ pkVal.toString()).setDoc(esFieldData);
|
|
|
if (StringUtils.isNotEmpty(parentVal)) {
|
|
|
esUpdateRequest.setRouting(parentVal);
|
|
|
}
|
|
@@ -361,12 +364,12 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
}
|
|
|
} else {
|
|
|
ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index())
|
|
|
- .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
|
|
|
- .size(10000);
|
|
|
+ .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
|
|
|
+ .size(10000);
|
|
|
SearchResponse response = esSearchRequest.getResponse();
|
|
|
for (SearchHit hit : response.getHits()) {
|
|
|
ESUpdateRequest esUpdateRequest = this.esConnection.new ES8xUpdateRequest(mapping.get_index(),
|
|
|
- hit.getId()).setDoc(esFieldData);
|
|
|
+ hit.getId()).setDoc(esFieldData);
|
|
|
getBulk().add(esUpdateRequest);
|
|
|
}
|
|
|
}
|
|
@@ -375,7 +378,7 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
/**
|
|
|
* 获取es mapping中的属性类型
|
|
|
*
|
|
|
- * @param mapping mapping配置
|
|
|
+ * @param mapping mapping配置
|
|
|
* @param fieldName 属性名
|
|
|
* @return 类型
|
|
|
*/
|
|
@@ -422,9 +425,9 @@ public class ES8xTemplate implements ESTemplate {
|
|
|
Object parentVal;
|
|
|
try {
|
|
|
parentVal = getValFromRS(mapping,
|
|
|
- resultSet,
|
|
|
- parentFieldItem.getFieldName(),
|
|
|
- parentFieldItem.getFieldName());
|
|
|
+ resultSet,
|
|
|
+ parentFieldItem.getFieldName(),
|
|
|
+ parentFieldItem.getFieldName());
|
|
|
} catch (SQLException e) {
|
|
|
throw new RuntimeException(e);
|
|
|
}
|