|
@@ -22,7 +22,7 @@ public class MessageUtil {
|
|
|
List<Dml> dmls = new ArrayList<Dml>(entries.size());
|
|
|
for (CanalEntry.Entry entry : entries) {
|
|
|
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|
|
|
- || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
|
|
|
+ || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
@@ -31,7 +31,7 @@ public class MessageUtil {
|
|
|
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
|
|
|
} catch (Exception e) {
|
|
|
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
|
|
|
- e);
|
|
|
+ e);
|
|
|
}
|
|
|
|
|
|
CanalEntry.EventType eventType = rowChange.getEventType();
|
|
@@ -57,7 +57,7 @@ public class MessageUtil {
|
|
|
int i = 0;
|
|
|
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
|
|
|
if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
|
|
|
- && eventType != CanalEntry.EventType.DELETE) {
|
|
|
+ && eventType != CanalEntry.EventType.DELETE) {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
@@ -80,10 +80,11 @@ public class MessageUtil {
|
|
|
row.put(column.getName(), null);
|
|
|
} else {
|
|
|
row.put(column.getName(),
|
|
|
- JdbcTypeUtil.typeConvert(dml.getTable(), column.getName(),
|
|
|
- column.getValue(),
|
|
|
- column.getSqlType(),
|
|
|
- column.getMysqlType()));
|
|
|
+ JdbcTypeUtil.typeConvert(dml.getTable(),
|
|
|
+ column.getName(),
|
|
|
+ column.getValue(),
|
|
|
+ column.getSqlType(),
|
|
|
+ column.getMysqlType()));
|
|
|
}
|
|
|
// 获取update为true的字段
|
|
|
if (column.getUpdated()) {
|
|
@@ -99,10 +100,11 @@ public class MessageUtil {
|
|
|
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
|
|
|
if (updateSet.contains(column.getName())) {
|
|
|
rowOld.put(column.getName(),
|
|
|
- JdbcTypeUtil.typeConvert(dml.getTable(), column.getName(),
|
|
|
- column.getValue(),
|
|
|
- column.getSqlType(),
|
|
|
- column.getMysqlType()));
|
|
|
+ JdbcTypeUtil.typeConvert(dml.getTable(),
|
|
|
+ column.getName(),
|
|
|
+ column.getValue(),
|
|
|
+ column.getSqlType(),
|
|
|
+ column.getMysqlType()));
|
|
|
}
|
|
|
}
|
|
|
// update操作将记录修改前的值
|
|
@@ -166,8 +168,8 @@ public class MessageUtil {
|
|
|
return dml;
|
|
|
}
|
|
|
|
|
|
- private static List<Map<String, Object>> changeRows(String table, List<Map<String, String>> rows, Map<String, Integer> sqlTypes,
|
|
|
- Map<String, String> mysqlTypes) {
|
|
|
+ private static List<Map<String, Object>> changeRows(String table, List<Map<String, String>> rows,
|
|
|
+ Map<String, Integer> sqlTypes, Map<String, String> mysqlTypes) {
|
|
|
List<Map<String, Object>> result = new ArrayList<>();
|
|
|
for (Map<String, String> row : rows) {
|
|
|
Map<String, Object> resultRow = new LinkedHashMap<>();
|