|
@@ -22,7 +22,7 @@ public class MessageUtil {
|
|
List<Dml> dmls = new ArrayList<Dml>(entries.size());
|
|
List<Dml> dmls = new ArrayList<Dml>(entries.size());
|
|
for (CanalEntry.Entry entry : entries) {
|
|
for (CanalEntry.Entry entry : entries) {
|
|
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|
|
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|
|
- || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
|
|
|
|
|
|
+ || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -31,7 +31,7 @@ public class MessageUtil {
|
|
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
|
|
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
|
|
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
|
|
- e);
|
|
|
|
|
|
+ e);
|
|
}
|
|
}
|
|
|
|
|
|
CanalEntry.EventType eventType = rowChange.getEventType();
|
|
CanalEntry.EventType eventType = rowChange.getEventType();
|
|
@@ -57,7 +57,7 @@ public class MessageUtil {
|
|
int i = 0;
|
|
int i = 0;
|
|
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
|
|
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
|
|
if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
|
|
if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
|
|
- && eventType != CanalEntry.EventType.DELETE) {
|
|
|
|
|
|
+ && eventType != CanalEntry.EventType.DELETE) {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -76,11 +76,15 @@ public class MessageUtil {
|
|
dml.getPkNames().add(column.getName());
|
|
dml.getPkNames().add(column.getName());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- row.put(column.getName(),
|
|
|
|
- JdbcTypeUtil.typeConvert(dml.getTable(),column.getName(),
|
|
|
|
- column.getValue(),
|
|
|
|
- column.getSqlType(),
|
|
|
|
- column.getMysqlType()));
|
|
|
|
|
|
+ if (column.getIsNull()) {
|
|
|
|
+ row.put(column.getName(), null);
|
|
|
|
+ } else {
|
|
|
|
+ row.put(column.getName(),
|
|
|
|
+ JdbcTypeUtil.typeConvert(dml.getTable(), column.getName(),
|
|
|
|
+ column.getValue(),
|
|
|
|
+ column.getSqlType(),
|
|
|
|
+ column.getMysqlType()));
|
|
|
|
+ }
|
|
// 获取update为true的字段
|
|
// 获取update为true的字段
|
|
if (column.getUpdated()) {
|
|
if (column.getUpdated()) {
|
|
updateSet.add(column.getName());
|
|
updateSet.add(column.getName());
|
|
@@ -95,10 +99,10 @@ public class MessageUtil {
|
|
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
|
|
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
|
|
if (updateSet.contains(column.getName())) {
|
|
if (updateSet.contains(column.getName())) {
|
|
rowOld.put(column.getName(),
|
|
rowOld.put(column.getName(),
|
|
- JdbcTypeUtil.typeConvert(dml.getTable(),column.getName(),
|
|
|
|
- column.getValue(),
|
|
|
|
- column.getSqlType(),
|
|
|
|
- column.getMysqlType()));
|
|
|
|
|
|
+ JdbcTypeUtil.typeConvert(dml.getTable(), column.getName(),
|
|
|
|
+ column.getValue(),
|
|
|
|
+ column.getSqlType(),
|
|
|
|
+ column.getMysqlType()));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// update操作将记录修改前的值
|
|
// update操作将记录修改前的值
|