|
@@ -14,8 +14,6 @@ import java.util.concurrent.locks.ReadWriteLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
import java.util.regex.Pattern;
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
-import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
|
|
|
|
-import org.apache.commons.beanutils.BeanUtils;
|
|
|
|
import org.apache.commons.lang.ObjectUtils;
|
|
import org.apache.commons.lang.ObjectUtils;
|
|
import org.apache.commons.lang.StringUtils;
|
|
import org.apache.commons.lang.StringUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
@@ -26,6 +24,7 @@ import com.alibaba.druid.sql.repository.Schema;
|
|
import com.alibaba.fastjson2.JSON;
|
|
import com.alibaba.fastjson2.JSON;
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
import com.alibaba.otter.canal.filter.CanalEventFilter;
|
|
import com.alibaba.otter.canal.filter.CanalEventFilter;
|
|
|
|
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
|
|
import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
|
|
import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
|
|
import com.alibaba.otter.canal.parse.exception.CanalParseException;
|
|
import com.alibaba.otter.canal.parse.exception.CanalParseException;
|
|
import com.alibaba.otter.canal.parse.inbound.TableMeta;
|
|
import com.alibaba.otter.canal.parse.inbound.TableMeta;
|
|
@@ -247,30 +246,47 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
}
|
|
}
|
|
|
|
|
|
private boolean applyHistoryToDB(EntryPosition position, String schema, String ddl, String extra) {
|
|
private boolean applyHistoryToDB(EntryPosition position, String schema, String ddl, String extra) {
|
|
- Map<String, String> content = new HashMap<>();
|
|
|
|
- content.put("destination", destination);
|
|
|
|
- content.put("binlogFile", position.getJournalName());
|
|
|
|
- content.put("binlogOffest", String.valueOf(position.getPosition()));
|
|
|
|
- content.put("binlogMasterId", String.valueOf(position.getServerId()));
|
|
|
|
- content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
|
|
|
|
- content.put("useSchema", schema);
|
|
|
|
- if (content.isEmpty()) {
|
|
|
|
- throw new RuntimeException("apply failed caused by content is empty in applyHistoryToDB");
|
|
|
|
- }
|
|
|
|
- // 待补充
|
|
|
|
|
|
+ // Map<String, String> content = new HashMap<>();
|
|
|
|
+ // content.put("destination", destination);
|
|
|
|
+ // content.put("binlogFile", position.getJournalName());
|
|
|
|
+ // content.put("binlogOffest", String.valueOf(position.getPosition()));
|
|
|
|
+ // content.put("binlogMasterId", String.valueOf(position.getServerId()));
|
|
|
|
+ // content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
|
|
|
|
+ // content.put("useSchema", schema);
|
|
|
|
+ //
|
|
|
|
+ // if (content.isEmpty()) {
|
|
|
|
+ // throw new RuntimeException("apply failed caused by content is empty in
|
|
|
|
+ // applyHistoryToDB");
|
|
|
|
+ // }
|
|
|
|
+ // // 待补充
|
|
|
|
+ // List<DdlResult> ddlResults = DruidDdlParser.parse(ddl, schema);
|
|
|
|
+ // if (ddlResults.size() > 0) {
|
|
|
|
+ // DdlResult ddlResult = ddlResults.get(0);
|
|
|
|
+ // content.put("sqlSchema", ddlResult.getSchemaName());
|
|
|
|
+ // content.put("sqlTable", ddlResult.getTableName());
|
|
|
|
+ // content.put("sqlType", ddlResult.getType().name());
|
|
|
|
+ // content.put("sqlText", ddl);
|
|
|
|
+ // content.put("extra", extra);
|
|
|
|
+ // }
|
|
|
|
+ // BeanUtils.populate(metaDO, content);
|
|
|
|
+
|
|
|
|
+ MetaHistoryDO metaDO = new MetaHistoryDO();
|
|
|
|
+ metaDO.setDestination(destination);
|
|
|
|
+ metaDO.setBinlogFile(position.getJournalName());
|
|
|
|
+ metaDO.setBinlogOffest(position.getPosition());
|
|
|
|
+ metaDO.setBinlogMasterId(String.valueOf(position.getServerId()));
|
|
|
|
+ metaDO.setBinlogTimestamp(position.getTimestamp());
|
|
|
|
+ metaDO.setUseSchema(schema);
|
|
List<DdlResult> ddlResults = DruidDdlParser.parse(ddl, schema);
|
|
List<DdlResult> ddlResults = DruidDdlParser.parse(ddl, schema);
|
|
if (ddlResults.size() > 0) {
|
|
if (ddlResults.size() > 0) {
|
|
DdlResult ddlResult = ddlResults.get(0);
|
|
DdlResult ddlResult = ddlResults.get(0);
|
|
- content.put("sqlSchema", ddlResult.getSchemaName());
|
|
|
|
- content.put("sqlTable", ddlResult.getTableName());
|
|
|
|
- content.put("sqlType", ddlResult.getType().name());
|
|
|
|
- content.put("sqlText", ddl);
|
|
|
|
- content.put("extra", extra);
|
|
|
|
|
|
+ metaDO.setSqlSchema(ddlResult.getSchemaName());
|
|
|
|
+ metaDO.setSqlTable(ddlResult.getTableName());
|
|
|
|
+ metaDO.setSqlType(ddlResult.getType().name());
|
|
|
|
+ metaDO.setSqlText(ddl);
|
|
|
|
+ metaDO.setExtra(extra);
|
|
}
|
|
}
|
|
-
|
|
|
|
- MetaHistoryDO metaDO = new MetaHistoryDO();
|
|
|
|
try {
|
|
try {
|
|
- BeanUtils.populate(metaDO, content);
|
|
|
|
// 会建立唯一约束,解决:
|
|
// 会建立唯一约束,解决:
|
|
// 1. 重复的binlog file+offest
|
|
// 1. 重复的binlog file+offest
|
|
// 2. 重复的masterId+timestamp
|
|
// 2. 重复的masterId+timestamp
|
|
@@ -328,20 +344,27 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
}
|
|
}
|
|
|
|
|
|
if (compareAll) {
|
|
if (compareAll) {
|
|
- Map<String, String> content = new HashMap<>();
|
|
|
|
- content.put("destination", destination);
|
|
|
|
- content.put("binlogFile", position.getJournalName());
|
|
|
|
- content.put("binlogOffest", String.valueOf(position.getPosition()));
|
|
|
|
- content.put("binlogMasterId", String.valueOf(position.getServerId()));
|
|
|
|
- content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
|
|
|
|
- content.put("data", JSON.toJSONString(schemaDdls));
|
|
|
|
- if (content.isEmpty()) {
|
|
|
|
- throw new RuntimeException("apply failed caused by content is empty in applySnapshotToDB");
|
|
|
|
- }
|
|
|
|
|
|
+ // Map<String, String> content = new HashMap<>();
|
|
|
|
+ // content.put("destination", destination);
|
|
|
|
+ // content.put("binlogFile", position.getJournalName());
|
|
|
|
+ // content.put("binlogOffest", String.valueOf(position.getPosition()));
|
|
|
|
+ // content.put("binlogMasterId", String.valueOf(position.getServerId()));
|
|
|
|
+ // content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
|
|
|
|
+ // content.put("data", JSON.toJSONString(schemaDdls));
|
|
|
|
+ // if (content.isEmpty()) {
|
|
|
|
+ // throw new RuntimeException("apply failed caused by content is empty in
|
|
|
|
+ // applySnapshotToDB");
|
|
|
|
+ // }
|
|
|
|
+ // BeanUtils.populate(snapshotDO, content);
|
|
|
|
|
|
MetaSnapshotDO snapshotDO = new MetaSnapshotDO();
|
|
MetaSnapshotDO snapshotDO = new MetaSnapshotDO();
|
|
|
|
+ snapshotDO.setDestination(destination);
|
|
|
|
+ snapshotDO.setBinlogFile(position.getJournalName());
|
|
|
|
+ snapshotDO.setBinlogOffest(position.getPosition());
|
|
|
|
+ snapshotDO.setBinlogMasterId(String.valueOf(position.getServerId()));
|
|
|
|
+ snapshotDO.setBinlogTimestamp(position.getTimestamp());
|
|
|
|
+ snapshotDO.setExtra(JSON.toJSONString(schemaDdls));
|
|
try {
|
|
try {
|
|
- BeanUtils.populate(snapshotDO, content);
|
|
|
|
metaSnapshotDAO.insert(snapshotDO);
|
|
metaSnapshotDAO.insert(snapshotDO);
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
if (isUkDuplicateException(e)) {
|
|
if (isUkDuplicateException(e)) {
|