|
@@ -13,9 +13,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.locks.ReadWriteLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
import java.util.regex.Pattern;
|
|
|
-import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
|
|
|
-import com.alibaba.polardbx.druid.sql.repository.Schema;
|
|
|
-import org.apache.commons.beanutils.BeanUtils;
|
|
|
+
|
|
|
import org.apache.commons.lang.ObjectUtils;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
@@ -40,6 +38,7 @@ import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDO;
|
|
|
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO;
|
|
|
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDO;
|
|
|
import com.alibaba.otter.canal.protocol.position.EntryPosition;
|
|
|
+import com.alibaba.polardbx.druid.sql.repository.Schema;
|
|
|
|
|
|
/**
|
|
|
* 基于db远程管理 see internal class: CanalTableMeta , ConsoleTableMetaTSDB
|
|
@@ -52,7 +51,8 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
public static final EntryPosition INIT_POSITION = new EntryPosition("0", 0L, -2L, -1L);
|
|
|
private static Logger logger = LoggerFactory.getLogger(DatabaseTableMeta.class);
|
|
|
private static Pattern pattern = Pattern.compile("Duplicate entry '.*' for key '*'");
|
|
|
- private static Pattern h2Pattern = Pattern.compile("Unique index or primary key violation");
|
|
|
+ private static Pattern h2Pattern = Pattern
|
|
|
+ .compile("Unique index or primary key violation");
|
|
|
private static ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
|
|
|
Thread thread = new Thread(r,
|
|
|
"[scheduler-table-meta-snapshot]");
|
|
@@ -63,7 +63,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
private AtomicBoolean initialized = new AtomicBoolean(false);
|
|
|
private String destination;
|
|
|
private MemoryTableMeta memoryTableMeta;
|
|
|
- private volatile MysqlConnection connection; // 查询meta信息的链接
|
|
|
+ private volatile MysqlConnection connection; // 查询meta信息的链接
|
|
|
private CanalEventFilter filter;
|
|
|
private CanalEventFilter blackFilter;
|
|
|
private Map<String, List<String>> fieldFilterMap = new HashMap<>();
|
|
@@ -80,6 +80,143 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ public static boolean compareTableMeta(TableMeta source, TableMeta target) {
|
|
|
+ if (!StringUtils.equalsIgnoreCase(source.getSchema(), target.getSchema())) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!StringUtils.equalsIgnoreCase(source.getTable(), target.getTable())) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<FieldMeta> sourceFields = source.getFields();
|
|
|
+ List<FieldMeta> targetFields = target.getFields();
|
|
|
+ if (sourceFields.size() != targetFields.size()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * MySQL DDL的一些默认行为:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * 1. Timestamp类型的列在第一次添加时,未指定默认值会默认为CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
|
|
+ * 2. Timestamp类型的列在第二次时,必须指定默认值
|
|
|
+ * 3. BLOB和TEXT类型不存在NULL、NOT NULL属性
|
|
|
+ * 4. 部分数据类型是synonyms,实际show create table时会转成对应类型
|
|
|
+ * 5. 非BLOB和TEXT类型在默认未指定NULL、NOT NULL时,默认default null
|
|
|
+ * 6. 在列变更时,不仅变更列名数据类型,同时各个约束中列名也会变更,同时如果约束中包含key length,则变更后的数据类型不应违背key length的约束(有长度的应大于key length;BLOB、TEXT应有key length;可以在存在key length情况下变更为无key length的数据类型,约束中key length忽略;等等)
|
|
|
+ * 7. 字符集每列(char类、eumn、set)默认保存,指定使用指定的,未指定使用表默认的,不受修改表默认字符集而改变,同表默认时,字符集显示省略
|
|
|
+ * 8. 新建表默认innodb引擎,latin1字符集
|
|
|
+ * 9. BLOB、TEXT会根据给定长度自动转换为对应的TINY、MEDIUM,LONG类型,长度和字符集也有关
|
|
|
+ * 10. unique约束在没有指定索引名是非幂等的,会自动以约束索引第一个列名称命名,同时以_2,_3这种形式添加后缀
|
|
|
+ * </pre>
|
|
|
+ */
|
|
|
+
|
|
|
+ for (int i = 0; i < sourceFields.size(); i++) {
|
|
|
+ FieldMeta sourceField = sourceFields.get(i);
|
|
|
+ FieldMeta targetField = targetFields.get(i);
|
|
|
+ if (!StringUtils.equalsIgnoreCase(sourceField.getColumnName(), targetField.getColumnName())) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ // if (!StringUtils.equalsIgnoreCase(sourceField.getColumnType(),
|
|
|
+ // targetField.getColumnType())) {
|
|
|
+ // return false;
|
|
|
+ // }
|
|
|
+
|
|
|
+ // https://github.com/alibaba/canal/issues/1100
|
|
|
+ // 支持一下 int vs int(10)
|
|
|
+ if ((sourceField.isUnsigned() && !targetField.isUnsigned())
|
|
|
+ || (!sourceField.isUnsigned() && targetField.isUnsigned())) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ String sourceColumnType = StringUtils.removeEndIgnoreCase(sourceField.getColumnType(), "zerofill").trim();
|
|
|
+ String targetColumnType = StringUtils.removeEndIgnoreCase(targetField.getColumnType(), "zerofill").trim();
|
|
|
+
|
|
|
+ String sign = sourceField.isUnsigned() ? "unsigned" : "signed";
|
|
|
+ sourceColumnType = StringUtils.removeEndIgnoreCase(sourceColumnType, sign).trim();
|
|
|
+ targetColumnType = StringUtils.removeEndIgnoreCase(targetColumnType, sign).trim();
|
|
|
+
|
|
|
+ boolean columnTypeCompare = false;
|
|
|
+ columnTypeCompare |= StringUtils.containsIgnoreCase(sourceColumnType, targetColumnType);
|
|
|
+ columnTypeCompare |= StringUtils.containsIgnoreCase(targetColumnType, sourceColumnType);
|
|
|
+ if (!columnTypeCompare) {
|
|
|
+ // 去掉精度参数再对比一次
|
|
|
+ sourceColumnType = synonymsType(StringUtils.substringBefore(sourceColumnType, "(")).trim();
|
|
|
+ targetColumnType = synonymsType(StringUtils.substringBefore(targetColumnType, "(")).trim();
|
|
|
+ columnTypeCompare |= StringUtils.containsIgnoreCase(sourceColumnType, targetColumnType);
|
|
|
+ columnTypeCompare |= StringUtils.containsIgnoreCase(targetColumnType, sourceColumnType);
|
|
|
+ if (!columnTypeCompare) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // if (!StringUtils.equalsIgnoreCase(sourceField.getDefaultValue(),
|
|
|
+ // targetField.getDefaultValue())) {
|
|
|
+ // return false;
|
|
|
+ // }
|
|
|
+
|
|
|
+ // BLOB, TEXT, GEOMETRY or JSON默认都是nullable,可以忽略比较,但比较了也是对齐
|
|
|
+ if (StringUtils.containsIgnoreCase(sourceColumnType, "timestamp")
|
|
|
+ || StringUtils.containsIgnoreCase(targetColumnType, "timestamp")) {
|
|
|
+ // timestamp可能会加default current_timestamp默认值,忽略对比nullable
|
|
|
+ } else {
|
|
|
+ if (sourceField.isNullable() != targetField.isNullable()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // mysql会有一种处理,针对show create只有uk没有pk时,会在desc默认将uk当做pk
|
|
|
+ boolean isSourcePkOrUk = sourceField.isKey() || sourceField.isUnique();
|
|
|
+ boolean isTargetPkOrUk = targetField.isKey() || targetField.isUnique();
|
|
|
+ if (isSourcePkOrUk != isTargetPkOrUk) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * <pre>
|
|
|
+ * synonyms处理
|
|
|
+ * 1. BOOL/BOOLEAN => TINYINT
|
|
|
+ * 2. DEC/NUMERIC/FIXED => DECIMAL
|
|
|
+ * 3. INTEGER => INT
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * @param originType
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private static String synonymsType(String originType) {
|
|
|
+ if (StringUtils.equalsIgnoreCase(originType, "bool") || StringUtils.equalsIgnoreCase(originType, "boolean")) {
|
|
|
+ return "tinyint";
|
|
|
+ } else
|
|
|
+ if (StringUtils.equalsIgnoreCase(originType, "dec") || StringUtils.equalsIgnoreCase(originType, "numeric")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "fixed")) {
|
|
|
+ return "decimal";
|
|
|
+ } else if (StringUtils.equalsIgnoreCase(originType, "integer")) {
|
|
|
+ return "int";
|
|
|
+ } else if (StringUtils.equalsIgnoreCase(originType, "real")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "double precision")) {
|
|
|
+ return "double";
|
|
|
+ }
|
|
|
+
|
|
|
+ // BLOB、TEXT会根据给定长度自动转换为对应的TINY、MEDIUM,LONG类型,长度和字符集也有关,统一按照blob对比
|
|
|
+ if (StringUtils.equalsIgnoreCase(originType, "tinyblob")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "mediumblob")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "longblob")) {
|
|
|
+ return "blob";
|
|
|
+ } else if (StringUtils.equalsIgnoreCase(originType, "tinytext")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "mediumtext")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "longtext")) {
|
|
|
+ return "text";
|
|
|
+ }
|
|
|
+
|
|
|
+ return originType;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public boolean init(final String destination) {
|
|
|
if (initialized.compareAndSet(false, true)) {
|
|
@@ -121,8 +258,9 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
try {
|
|
|
connection.disconnect();
|
|
|
} catch (IOException e) {
|
|
|
- logger.error("ERROR # disconnect meta connection for address:{}", connection.getConnector()
|
|
|
- .getAddress(), e);
|
|
|
+ logger.error("ERROR # disconnect meta connection for address:{}",
|
|
|
+ connection.getConnector().getAddress(),
|
|
|
+ e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -234,7 +372,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
|
|
|
StringBuilder sql = new StringBuilder();
|
|
|
for (String table : tables) {
|
|
|
- sql.append("show create table `" + schema + "`.`" + StringUtils.replace(table,"`","``") + "`;");
|
|
|
+ sql.append("show create table `" + schema + "`.`" + StringUtils.replace(table, "`", "``") + "`;");
|
|
|
}
|
|
|
|
|
|
List<ResultSetPacket> packets = connection.queryMulti(sql.toString());
|
|
@@ -370,7 +508,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
snapshotDO.setBinlogOffest(position.getPosition());
|
|
|
snapshotDO.setBinlogMasterId(String.valueOf(position.getServerId()));
|
|
|
snapshotDO.setBinlogTimestamp(position.getTimestamp());
|
|
|
- snapshotDO.setData(JSON.toJSONString(schemaDdls,JSONWriter.Feature.LargeObject));
|
|
|
+ snapshotDO.setData(JSON.toJSONString(schemaDdls, JSONWriter.Feature.LargeObject));
|
|
|
try {
|
|
|
metaSnapshotDAO.insert(snapshotDO);
|
|
|
} catch (Throwable e) {
|
|
@@ -466,9 +604,8 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
|
|
|
private boolean applyHistoryOnMemory(EntryPosition position, EntryPosition rollbackPosition) {
|
|
|
try {
|
|
|
- List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO.findByTimestamp(destination,
|
|
|
- position.getTimestamp(),
|
|
|
- rollbackPosition.getTimestamp());
|
|
|
+ List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO
|
|
|
+ .findByTimestamp(destination, position.getTimestamp(), rollbackPosition.getTimestamp());
|
|
|
if (metaHistoryDOList == null) {
|
|
|
return true;
|
|
|
}
|
|
@@ -490,8 +627,8 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
continue;
|
|
|
} else if (rollbackPosition.getServerId().equals(snapshotPosition.getServerId())
|
|
|
&& snapshotPosition.compareTo(rollbackPosition) > 0) {
|
|
|
- continue;
|
|
|
- }
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
// 记录到内存
|
|
|
if (!memoryTableMeta.apply(snapshotPosition, useSchema, sqlData, null)) {
|
|
@@ -515,156 +652,18 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
|
|
|
private String getFullName(String schema, String table) {
|
|
|
StringBuilder builder = new StringBuilder();
|
|
|
- return builder.append(structureSchema(schema)).append('.').append('`').append(StringUtils.replace(table,"`","``")).append('`').toString();
|
|
|
- }
|
|
|
-
|
|
|
- public static boolean compareTableMeta(TableMeta source, TableMeta target) {
|
|
|
- if (!StringUtils.equalsIgnoreCase(source.getSchema(), target.getSchema())) {
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- if (!StringUtils.equalsIgnoreCase(source.getTable(), target.getTable())) {
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- List<FieldMeta> sourceFields = source.getFields();
|
|
|
- List<FieldMeta> targetFields = target.getFields();
|
|
|
- if (sourceFields.size() != targetFields.size()) {
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * MySQL DDL的一些默认行为:
|
|
|
- *
|
|
|
- * <pre>
|
|
|
- * 1. Timestamp类型的列在第一次添加时,未指定默认值会默认为CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
|
|
- * 2. Timestamp类型的列在第二次时,必须指定默认值
|
|
|
- * 3. BLOB和TEXT类型不存在NULL、NOT NULL属性
|
|
|
- * 4. 部分数据类型是synonyms,实际show create table时会转成对应类型
|
|
|
- * 5. 非BLOB和TEXT类型在默认未指定NULL、NOT NULL时,默认default null
|
|
|
- * 6. 在列变更时,不仅变更列名数据类型,同时各个约束中列名也会变更,同时如果约束中包含key length,则变更后的数据类型不应违背key length的约束(有长度的应大于key length;BLOB、TEXT应有key length;可以在存在key length情况下变更为无key length的数据类型,约束中key length忽略;等等)
|
|
|
- * 7. 字符集每列(char类、eumn、set)默认保存,指定使用指定的,未指定使用表默认的,不受修改表默认字符集而改变,同表默认时,字符集显示省略
|
|
|
- * 8. 新建表默认innodb引擎,latin1字符集
|
|
|
- * 9. BLOB、TEXT会根据给定长度自动转换为对应的TINY、MEDIUM,LONG类型,长度和字符集也有关
|
|
|
- * 10. unique约束在没有指定索引名是非幂等的,会自动以约束索引第一个列名称命名,同时以_2,_3这种形式添加后缀
|
|
|
- * </pre>
|
|
|
- */
|
|
|
-
|
|
|
- for (int i = 0; i < sourceFields.size(); i++) {
|
|
|
- FieldMeta sourceField = sourceFields.get(i);
|
|
|
- FieldMeta targetField = targetFields.get(i);
|
|
|
- if (!StringUtils.equalsIgnoreCase(sourceField.getColumnName(), targetField.getColumnName())) {
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- // if (!StringUtils.equalsIgnoreCase(sourceField.getColumnType(),
|
|
|
- // targetField.getColumnType())) {
|
|
|
- // return false;
|
|
|
- // }
|
|
|
-
|
|
|
- // https://github.com/alibaba/canal/issues/1100
|
|
|
- // 支持一下 int vs int(10)
|
|
|
- if ((sourceField.isUnsigned() && !targetField.isUnsigned())
|
|
|
- || (!sourceField.isUnsigned() && targetField.isUnsigned())) {
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- String sourceColumnType = StringUtils.removeEndIgnoreCase(sourceField.getColumnType(), "zerofill").trim();
|
|
|
- String targetColumnType = StringUtils.removeEndIgnoreCase(targetField.getColumnType(), "zerofill").trim();
|
|
|
-
|
|
|
- String sign = sourceField.isUnsigned() ? "unsigned" : "signed";
|
|
|
- sourceColumnType = StringUtils.removeEndIgnoreCase(sourceColumnType, sign).trim();
|
|
|
- targetColumnType = StringUtils.removeEndIgnoreCase(targetColumnType, sign).trim();
|
|
|
-
|
|
|
- boolean columnTypeCompare = false;
|
|
|
- columnTypeCompare |= StringUtils.containsIgnoreCase(sourceColumnType, targetColumnType);
|
|
|
- columnTypeCompare |= StringUtils.containsIgnoreCase(targetColumnType, sourceColumnType);
|
|
|
- if (!columnTypeCompare) {
|
|
|
- // 去掉精度参数再对比一次
|
|
|
- sourceColumnType = synonymsType(StringUtils.substringBefore(sourceColumnType, "(")).trim();
|
|
|
- targetColumnType = synonymsType(StringUtils.substringBefore(targetColumnType, "(")).trim();
|
|
|
- columnTypeCompare |= StringUtils.containsIgnoreCase(sourceColumnType, targetColumnType);
|
|
|
- columnTypeCompare |= StringUtils.containsIgnoreCase(targetColumnType, sourceColumnType);
|
|
|
- if (!columnTypeCompare) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // if (!StringUtils.equalsIgnoreCase(sourceField.getDefaultValue(),
|
|
|
- // targetField.getDefaultValue())) {
|
|
|
- // return false;
|
|
|
- // }
|
|
|
-
|
|
|
- // BLOB, TEXT, GEOMETRY or JSON默认都是nullable,可以忽略比较,但比较了也是对齐
|
|
|
- if (StringUtils.containsIgnoreCase(sourceColumnType, "timestamp")
|
|
|
- || StringUtils.containsIgnoreCase(targetColumnType, "timestamp")) {
|
|
|
- // timestamp可能会加default current_timestamp默认值,忽略对比nullable
|
|
|
- } else {
|
|
|
- if (sourceField.isNullable() != targetField.isNullable()) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // mysql会有一种处理,针对show create只有uk没有pk时,会在desc默认将uk当做pk
|
|
|
- boolean isSourcePkOrUk = sourceField.isKey() || sourceField.isUnique();
|
|
|
- boolean isTargetPkOrUk = targetField.isKey() || targetField.isUnique();
|
|
|
- if (isSourcePkOrUk != isTargetPkOrUk) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * <pre>
|
|
|
- * synonyms处理
|
|
|
- * 1. BOOL/BOOLEAN => TINYINT
|
|
|
- * 2. DEC/NUMERIC/FIXED => DECIMAL
|
|
|
- * 3. INTEGER => INT
|
|
|
- *
|
|
|
- *
|
|
|
- * </pre>
|
|
|
- *
|
|
|
- * @param originType
|
|
|
- * @return
|
|
|
- */
|
|
|
- private static String synonymsType(String originType) {
|
|
|
- if (StringUtils.equalsIgnoreCase(originType, "bool") || StringUtils.equalsIgnoreCase(originType, "boolean")) {
|
|
|
- return "tinyint";
|
|
|
- } else if (StringUtils.equalsIgnoreCase(originType, "dec")
|
|
|
- || StringUtils.equalsIgnoreCase(originType, "numeric")
|
|
|
- || StringUtils.equalsIgnoreCase(originType, "fixed")) {
|
|
|
- return "decimal";
|
|
|
- } else if (StringUtils.equalsIgnoreCase(originType, "integer")) {
|
|
|
- return "int";
|
|
|
- } else if (StringUtils.equalsIgnoreCase(originType, "real")
|
|
|
- || StringUtils.equalsIgnoreCase(originType, "double precision")) {
|
|
|
- return "double";
|
|
|
- }
|
|
|
-
|
|
|
- // BLOB、TEXT会根据给定长度自动转换为对应的TINY、MEDIUM,LONG类型,长度和字符集也有关,统一按照blob对比
|
|
|
- if (StringUtils.equalsIgnoreCase(originType, "tinyblob")
|
|
|
- || StringUtils.equalsIgnoreCase(originType, "mediumblob")
|
|
|
- || StringUtils.equalsIgnoreCase(originType, "longblob")) {
|
|
|
- return "blob";
|
|
|
- } else if (StringUtils.equalsIgnoreCase(originType, "tinytext")
|
|
|
- || StringUtils.equalsIgnoreCase(originType, "mediumtext")
|
|
|
- || StringUtils.equalsIgnoreCase(originType, "longtext")) {
|
|
|
- return "text";
|
|
|
- }
|
|
|
-
|
|
|
- return originType;
|
|
|
+ return builder.append(structureSchema(schema))
|
|
|
+ .append('.')
|
|
|
+ .append('`')
|
|
|
+ .append(StringUtils.replace(table, "`", "``"))
|
|
|
+ .append('`')
|
|
|
+ .toString();
|
|
|
}
|
|
|
|
|
|
private int snapshotExpire(int expireTimestamp) {
|
|
|
return metaSnapshotDAO.deleteByTimestamp(destination, expireTimestamp);
|
|
|
}
|
|
|
|
|
|
- public void setConnection(MysqlConnection connection) {
|
|
|
- this.connection = connection;
|
|
|
- }
|
|
|
-
|
|
|
public void setFilter(CanalEventFilter filter) {
|
|
|
this.filter = filter;
|
|
|
}
|
|
@@ -689,22 +688,22 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
this.blackFilter = blackFilter;
|
|
|
}
|
|
|
|
|
|
- public void setFieldFilterMap(Map<String, List<String>> fieldFilterMap) {
|
|
|
- this.fieldFilterMap = fieldFilterMap;
|
|
|
- }
|
|
|
-
|
|
|
- public void setFieldBlackFilterMap(Map<String, List<String>> fieldBlackFilterMap) {
|
|
|
- this.fieldBlackFilterMap = fieldBlackFilterMap;
|
|
|
- }
|
|
|
-
|
|
|
public Map<String, List<String>> getFieldFilterMap() {
|
|
|
return fieldFilterMap;
|
|
|
}
|
|
|
|
|
|
+ public void setFieldFilterMap(Map<String, List<String>> fieldFilterMap) {
|
|
|
+ this.fieldFilterMap = fieldFilterMap;
|
|
|
+ }
|
|
|
+
|
|
|
public Map<String, List<String>> getFieldBlackFilterMap() {
|
|
|
return fieldBlackFilterMap;
|
|
|
}
|
|
|
|
|
|
+ public void setFieldBlackFilterMap(Map<String, List<String>> fieldBlackFilterMap) {
|
|
|
+ this.fieldBlackFilterMap = fieldBlackFilterMap;
|
|
|
+ }
|
|
|
+
|
|
|
public int getSnapshotInterval() {
|
|
|
return snapshotInterval;
|
|
|
}
|
|
@@ -725,6 +724,10 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
return connection;
|
|
|
}
|
|
|
|
|
|
+ public void setConnection(MysqlConnection connection) {
|
|
|
+ this.connection = connection;
|
|
|
+ }
|
|
|
+
|
|
|
public boolean isUkDuplicateException(Throwable t) {
|
|
|
if (pattern.matcher(t.getMessage()).find() || h2Pattern.matcher(t.getMessage()).find()) {
|
|
|
// 违反外键约束时也抛出这种异常,所以这里还要判断包含字符串Duplicate entry
|