|
@@ -54,13 +54,13 @@ public class ESSyncService {
|
|
for (ESSyncConfig config : esSyncConfigs) {
|
|
for (ESSyncConfig config : esSyncConfigs) {
|
|
if (logger.isTraceEnabled()) {
|
|
if (logger.isTraceEnabled()) {
|
|
logger.trace("Prepared to sync index: {}, destination: {}",
|
|
logger.trace("Prepared to sync index: {}, destination: {}",
|
|
- config.getEsMapping().get_index(),
|
|
|
|
|
|
+ config.getEsMapping().getIndex(),
|
|
dml.getDestination());
|
|
dml.getDestination());
|
|
}
|
|
}
|
|
this.sync(config, dml);
|
|
this.sync(config, dml);
|
|
if (logger.isTraceEnabled()) {
|
|
if (logger.isTraceEnabled()) {
|
|
logger.trace("Sync completed: {}, destination: {}",
|
|
logger.trace("Sync completed: {}, destination: {}",
|
|
- config.getEsMapping().get_index(),
|
|
|
|
|
|
+ config.getEsMapping().getIndex(),
|
|
dml.getDestination());
|
|
dml.getDestination());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -73,7 +73,7 @@ public class ESSyncService {
|
|
if (logger.isDebugEnabled()) {
|
|
if (logger.isDebugEnabled()) {
|
|
StringBuilder configIndexes = new StringBuilder();
|
|
StringBuilder configIndexes = new StringBuilder();
|
|
esSyncConfigs
|
|
esSyncConfigs
|
|
- .forEach(esSyncConfig -> configIndexes.append(esSyncConfig.getEsMapping().get_index()).append(" "));
|
|
|
|
|
|
+ .forEach(esSyncConfig -> configIndexes.append(esSyncConfig.getEsMapping().getIndex()).append(" "));
|
|
logger.debug("DML: {} \nAffected indexes: {}",
|
|
logger.debug("DML: {} \nAffected indexes: {}",
|
|
JSON.toJSONString(dml, JSONWriter.Feature.WriteNulls),
|
|
JSON.toJSONString(dml, JSONWriter.Feature.WriteNulls),
|
|
configIndexes.toString());
|
|
configIndexes.toString());
|
|
@@ -105,10 +105,10 @@ public class ESSyncService {
|
|
logger.trace("Sync elapsed time: {} ms,destination: {}, es index: {}",
|
|
logger.trace("Sync elapsed time: {} ms,destination: {}, es index: {}",
|
|
(System.currentTimeMillis() - begin),
|
|
(System.currentTimeMillis() - begin),
|
|
dml.getDestination(),
|
|
dml.getDestination(),
|
|
- config.getEsMapping().get_index());
|
|
|
|
|
|
+ config.getEsMapping().getIndex());
|
|
}
|
|
}
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
- logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);
|
|
|
|
|
|
+ logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().getIndex(), dml);
|
|
throw new RuntimeException(e);
|
|
throw new RuntimeException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -210,7 +210,7 @@ public class ESSyncService {
|
|
// ------主表 查询sql来更新------
|
|
// ------主表 查询sql来更新------
|
|
if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
|
|
if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
|
|
ESMapping mapping = config.getEsMapping();
|
|
ESMapping mapping = config.getEsMapping();
|
|
- String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
|
|
|
|
|
|
+ String idFieldName = mapping.getId() == null ? mapping.getPk() : mapping.getId();
|
|
FieldItem idFieldItem = schemaItem.getSelectFields().get(idFieldName);
|
|
FieldItem idFieldItem = schemaItem.getSelectFields().get(idFieldName);
|
|
|
|
|
|
boolean idFieldSimple = true;
|
|
boolean idFieldSimple = true;
|
|
@@ -336,7 +336,7 @@ public class ESSyncService {
|
|
|
|
|
|
// ------是主表------
|
|
// ------是主表------
|
|
if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
|
|
if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
|
|
- if (mapping.get_id() != null) {
|
|
|
|
|
|
+ if (mapping.getId() != null) {
|
|
FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);
|
|
FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);
|
|
// 主键为简单字段
|
|
// 主键为简单字段
|
|
if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {
|
|
if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {
|
|
@@ -349,7 +349,7 @@ public class ESSyncService {
|
|
logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",
|
|
logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index(),
|
|
|
|
|
|
+ mapping.getIndex(),
|
|
idVal);
|
|
idVal);
|
|
}
|
|
}
|
|
esTemplate.delete(mapping, idVal, null);
|
|
esTemplate.delete(mapping, idVal, null);
|
|
@@ -368,7 +368,7 @@ public class ESSyncService {
|
|
logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, pk: {}",
|
|
logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, pk: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index(),
|
|
|
|
|
|
+ mapping.getIndex(),
|
|
pkVal);
|
|
pkVal);
|
|
}
|
|
}
|
|
esFieldData.remove(pkFieldItem.getFieldName());
|
|
esFieldData.remove(pkFieldItem.getFieldName());
|
|
@@ -438,7 +438,7 @@ public class ESSyncService {
|
|
logger.trace("Single table insert to es index, destination:{}, table: {}, index: {}, id: {}",
|
|
logger.trace("Single table insert to es index, destination:{}, table: {}, index: {}, id: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index(),
|
|
|
|
|
|
+ mapping.getIndex(),
|
|
idVal);
|
|
idVal);
|
|
}
|
|
}
|
|
esTemplate.insert(mapping, idVal, esFieldData);
|
|
esTemplate.insert(mapping, idVal, esFieldData);
|
|
@@ -461,7 +461,7 @@ public class ESSyncService {
|
|
logger.trace("Main table insert to es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
logger.trace("Main table insert to es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index(),
|
|
|
|
|
|
+ mapping.getIndex(),
|
|
sql.replace("\n", " "));
|
|
sql.replace("\n", " "));
|
|
}
|
|
}
|
|
Util.sqlRS(ds, sql, rs -> {
|
|
Util.sqlRS(ds, sql, rs -> {
|
|
@@ -475,7 +475,7 @@ public class ESSyncService {
|
|
"Main table insert to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
|
|
"Main table insert to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index(),
|
|
|
|
|
|
+ mapping.getIndex(),
|
|
idVal);
|
|
idVal);
|
|
}
|
|
}
|
|
esTemplate.insert(mapping, idVal, esFieldData);
|
|
esTemplate.insert(mapping, idVal, esFieldData);
|
|
@@ -497,7 +497,7 @@ public class ESSyncService {
|
|
logger.trace("Main table delete es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
logger.trace("Main table delete es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index(),
|
|
|
|
|
|
+ mapping.getIndex(),
|
|
sql.replace("\n", " "));
|
|
sql.replace("\n", " "));
|
|
}
|
|
}
|
|
Util.sqlRS(ds, sql, rs -> {
|
|
Util.sqlRS(ds, sql, rs -> {
|
|
@@ -519,7 +519,7 @@ public class ESSyncService {
|
|
"Main table delete to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
|
|
"Main table delete to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index(),
|
|
|
|
|
|
+ mapping.getIndex(),
|
|
idVal);
|
|
idVal);
|
|
}
|
|
}
|
|
esTemplate.delete(mapping, idVal, esFieldData);
|
|
esTemplate.delete(mapping, idVal, esFieldData);
|
|
@@ -554,7 +554,7 @@ public class ESSyncService {
|
|
|
|
|
|
String fieldName = fieldItem.getFieldName();
|
|
String fieldName = fieldItem.getFieldName();
|
|
// 判断是否是主键
|
|
// 判断是否是主键
|
|
- if (fieldName.equals(mapping.get_id())) {
|
|
|
|
|
|
+ if (fieldName.equals(mapping.getId())) {
|
|
fieldName = "_id";
|
|
fieldName = "_id";
|
|
}
|
|
}
|
|
paramsTmp.put(fieldName, value);
|
|
paramsTmp.put(fieldName, value);
|
|
@@ -566,7 +566,7 @@ public class ESSyncService {
|
|
logger.trace("Join table update es index by foreign key, destination:{}, table: {}, index: {}",
|
|
logger.trace("Join table update es index by foreign key, destination:{}, table: {}, index: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index());
|
|
|
|
|
|
+ mapping.getIndex());
|
|
}
|
|
}
|
|
esTemplate.updateByQuery(config, paramsTmp, esFieldData);
|
|
esTemplate.updateByQuery(config, paramsTmp, esFieldData);
|
|
}
|
|
}
|
|
@@ -617,7 +617,7 @@ public class ESSyncService {
|
|
logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index(),
|
|
|
|
|
|
+ mapping.getIndex(),
|
|
sql.toString().replace("\n", " "));
|
|
sql.toString().replace("\n", " "));
|
|
}
|
|
}
|
|
Util.sqlRS(ds, sql.toString(), values, rs -> {
|
|
Util.sqlRS(ds, sql.toString(), values, rs -> {
|
|
@@ -661,7 +661,7 @@ public class ESSyncService {
|
|
entry.getKey().getColumn().getColumnName());
|
|
entry.getKey().getColumn().getColumnName());
|
|
String fieldName = fieldItem.getFieldName();
|
|
String fieldName = fieldItem.getFieldName();
|
|
// 判断是否是主键
|
|
// 判断是否是主键
|
|
- if (fieldName.equals(mapping.get_id())) {
|
|
|
|
|
|
+ if (fieldName.equals(mapping.getId())) {
|
|
fieldName = "_id";
|
|
fieldName = "_id";
|
|
}
|
|
}
|
|
paramsTmp.put(fieldName, value);
|
|
paramsTmp.put(fieldName, value);
|
|
@@ -673,7 +673,7 @@ public class ESSyncService {
|
|
logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}",
|
|
logger.trace("Join table update es index by query sql, destination:{}, table: {}, index: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index());
|
|
|
|
|
|
+ mapping.getIndex());
|
|
}
|
|
}
|
|
esTemplate.updateByQuery(config, paramsTmp, esFieldData);
|
|
esTemplate.updateByQuery(config, paramsTmp, esFieldData);
|
|
}
|
|
}
|
|
@@ -721,7 +721,7 @@ public class ESSyncService {
|
|
logger.trace("Join table update es index by query whole sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
logger.trace("Join table update es index by query whole sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index(),
|
|
|
|
|
|
+ mapping.getIndex(),
|
|
sql.toString().replace("\n", " "));
|
|
sql.toString().replace("\n", " "));
|
|
}
|
|
}
|
|
Util.sqlRS(ds, sql.toString(), rs -> {
|
|
Util.sqlRS(ds, sql.toString(), rs -> {
|
|
@@ -775,7 +775,7 @@ public class ESSyncService {
|
|
.getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
|
|
.getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
|
|
String fieldName = fieldItem.getFieldName();
|
|
String fieldName = fieldItem.getFieldName();
|
|
// 判断是否是主键
|
|
// 判断是否是主键
|
|
- if (fieldName.equals(mapping.get_id())) {
|
|
|
|
|
|
+ if (fieldName.equals(mapping.getId())) {
|
|
fieldName = "_id";
|
|
fieldName = "_id";
|
|
}
|
|
}
|
|
paramsTmp.put(fieldName, value);
|
|
paramsTmp.put(fieldName, value);
|
|
@@ -787,7 +787,7 @@ public class ESSyncService {
|
|
"Join table update es index by query whole sql, destination:{}, table: {}, index: {}",
|
|
"Join table update es index by query whole sql, destination:{}, table: {}, index: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index());
|
|
|
|
|
|
+ mapping.getIndex());
|
|
}
|
|
}
|
|
esTemplate.updateByQuery(config, paramsTmp, esFieldData);
|
|
esTemplate.updateByQuery(config, paramsTmp, esFieldData);
|
|
}
|
|
}
|
|
@@ -817,7 +817,7 @@ public class ESSyncService {
|
|
logger.trace("Main table update to es index, destination:{}, table: {}, index: {}, id: {}",
|
|
logger.trace("Main table update to es index, destination:{}, table: {}, index: {}, id: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index(),
|
|
|
|
|
|
+ mapping.getIndex(),
|
|
idVal);
|
|
idVal);
|
|
}
|
|
}
|
|
esTemplate.update(mapping, idVal, esFieldData);
|
|
esTemplate.update(mapping, idVal, esFieldData);
|
|
@@ -840,7 +840,7 @@ public class ESSyncService {
|
|
logger.trace("Main table update to es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
logger.trace("Main table update to es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index(),
|
|
|
|
|
|
+ mapping.getIndex(),
|
|
sql.replace("\n", " "));
|
|
sql.replace("\n", " "));
|
|
}
|
|
}
|
|
Util.sqlRS(ds, sql, rs -> {
|
|
Util.sqlRS(ds, sql, rs -> {
|
|
@@ -854,7 +854,7 @@ public class ESSyncService {
|
|
"Main table update to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
|
|
"Main table update to es index by query sql, destination:{}, table: {}, index: {}, id: {}",
|
|
config.getDestination(),
|
|
config.getDestination(),
|
|
dml.getTable(),
|
|
dml.getTable(),
|
|
- mapping.get_index(),
|
|
|
|
|
|
+ mapping.getIndex(),
|
|
idVal);
|
|
idVal);
|
|
}
|
|
}
|
|
esTemplate.update(mapping, idVal, esFieldData);
|
|
esTemplate.update(mapping, idVal, esFieldData);
|