|
@@ -48,35 +48,35 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
|
|
|
*/
|
|
|
public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
|
|
|
- public static final EntryPosition INIT_POSITION = new EntryPosition("0", 0L, -2L, -1L);
|
|
|
- private static Logger logger = LoggerFactory.getLogger(DatabaseTableMeta.class);
|
|
|
- private static Pattern pattern = Pattern.compile("Duplicate entry '.*' for key '*'");
|
|
|
- private static Pattern h2Pattern = Pattern.compile("Unique index or primary key violation");
|
|
|
- private static ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public Thread newThread(Runnable r) {
|
|
|
- Thread thread = new Thread(r,
|
|
|
- "[scheduler-table-meta-snapshot]");
|
|
|
- thread.setDaemon(true);
|
|
|
- return thread;
|
|
|
- }
|
|
|
- });
|
|
|
- private ReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
- private AtomicBoolean initialized = new AtomicBoolean(false);
|
|
|
+ public static final EntryPosition INIT_POSITION = new EntryPosition("0", 0L, -2L, -1L);
|
|
|
+ private static Logger logger = LoggerFactory.getLogger(DatabaseTableMeta.class);
|
|
|
+ private static Pattern pattern = Pattern.compile("Duplicate entry '.*' for key '*'");
|
|
|
+ private static Pattern h2Pattern = Pattern.compile("Unique index or primary key violation");
|
|
|
+ private static ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Thread newThread(Runnable r) {
|
|
|
+ Thread thread = new Thread(r,
|
|
|
+ "[scheduler-table-meta-snapshot]");
|
|
|
+ thread.setDaemon(true);
|
|
|
+ return thread;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
+ private AtomicBoolean initialized = new AtomicBoolean(false);
|
|
|
private String destination;
|
|
|
private MemoryTableMeta memoryTableMeta;
|
|
|
- private volatile MysqlConnection connection; // 查询meta信息的链接
|
|
|
+ private volatile MysqlConnection connection; // 查询meta信息的链接
|
|
|
private CanalEventFilter filter;
|
|
|
private CanalEventFilter blackFilter;
|
|
|
- private Map<String, List<String>> fieldFilterMap = new HashMap<String, List<String>>();
|
|
|
- private Map<String, List<String>> fieldBlackFilterMap = new HashMap<String, List<String>>();
|
|
|
+ private Map<String, List<String>> fieldFilterMap = new HashMap<String, List<String>>();
|
|
|
+ private Map<String, List<String>> fieldBlackFilterMap = new HashMap<String, List<String>>();
|
|
|
private EntryPosition lastPosition;
|
|
|
private boolean hasNewDdl;
|
|
|
private MetaHistoryDAO metaHistoryDAO;
|
|
|
private MetaSnapshotDAO metaSnapshotDAO;
|
|
|
- private int snapshotInterval = 24;
|
|
|
- private int snapshotExpire = 360;
|
|
|
+ private int snapshotInterval = 24;
|
|
|
+ private int snapshotExpire = 360;
|
|
|
private ScheduledFuture<?> scheduleSnapshotFuture;
|
|
|
|
|
|
public DatabaseTableMeta(){
|
|
@@ -486,7 +486,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
.toString();
|
|
|
}
|
|
|
|
|
|
- private boolean compareTableMeta(TableMeta source, TableMeta target) {
|
|
|
+ public static boolean compareTableMeta(TableMeta source, TableMeta target) {
|
|
|
if (!StringUtils.equalsIgnoreCase(source.getSchema(), target.getSchema())) {
|
|
|
return false;
|
|
|
}
|
|
@@ -501,6 +501,23 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * MySQL DDL的一些默认行为:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * 1. Timestamp类型的列在第一次添加时,未指定默认值会默认为CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
|
|
+ * 2. Timestamp类型的列在第二次时,必须指定默认值
|
|
|
+ * 3. BLOB和TEXT类型不存在NULL、NOT NULL属性
|
|
|
+ * 4. 部分数据类型是synonyms,实际show create table时会转成对应类型
|
|
|
+ * 5. 非BLOB和TEXT类型在默认未指定NULL、NOT NULL时,默认default null
|
|
|
+ * 6. 在列变更时,不仅变更列名数据类型,同时各个约束中列名也会变更,同时如果约束中包含key length,则变更后的数据类型不应违背key length的约束(有长度的应大于key length;BLOB、TEXT应有key length;可以在存在key length情况下变更为无key length的数据类型,约束中key length忽略;等等)
|
|
|
+ * 7. 字符集每列(char类、eumn、set)默认保存,指定使用指定的,未指定使用表默认的,不受修改表默认字符集而改变,同表默认时,字符集显示省略
|
|
|
+ * 8. 新建表默认innodb引擎,latin1字符集
|
|
|
+ * 9. BLOB、TEXT会根据给定长度自动转换为对应的TINY、MEDIUM,LONG类型,长度和字符集也有关
|
|
|
+ * 10. unique约束在没有指定索引名是非幂等的,会自动以约束索引第一个列名称命名,同时以_2,_3这种形式添加后缀
|
|
|
+ * </pre>
|
|
|
+ */
|
|
|
+
|
|
|
for (int i = 0; i < sourceFields.size(); i++) {
|
|
|
FieldMeta sourceField = sourceFields.get(i);
|
|
|
FieldMeta targetField = targetFields.get(i);
|
|
@@ -520,15 +537,25 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ String sourceColumnType = StringUtils.removeEndIgnoreCase(sourceField.getColumnType(), "zerofill").trim();
|
|
|
+ String targetColumnType = StringUtils.removeEndIgnoreCase(targetField.getColumnType(), "zerofill").trim();
|
|
|
+
|
|
|
String sign = sourceField.isUnsigned() ? "unsigned" : "signed";
|
|
|
- String sourceColumnType = StringUtils.removeEndIgnoreCase(sourceField.getColumnType(), sign).trim();
|
|
|
- String targetColumnType = StringUtils.removeEndIgnoreCase(targetField.getColumnType(), sign).trim();
|
|
|
+ sourceColumnType = StringUtils.removeEndIgnoreCase(sourceColumnType, sign).trim();
|
|
|
+ targetColumnType = StringUtils.removeEndIgnoreCase(targetColumnType, sign).trim();
|
|
|
|
|
|
boolean columnTypeCompare = false;
|
|
|
columnTypeCompare |= StringUtils.containsIgnoreCase(sourceColumnType, targetColumnType);
|
|
|
columnTypeCompare |= StringUtils.containsIgnoreCase(targetColumnType, sourceColumnType);
|
|
|
if (!columnTypeCompare) {
|
|
|
- return false;
|
|
|
+ // 去掉精度参数再对比一次
|
|
|
+ sourceColumnType = synonymsType(StringUtils.substringBefore(sourceColumnType, "(")).trim();
|
|
|
+ targetColumnType = synonymsType(StringUtils.substringBefore(targetColumnType, "(")).trim();
|
|
|
+ columnTypeCompare |= StringUtils.containsIgnoreCase(sourceColumnType, targetColumnType);
|
|
|
+ columnTypeCompare |= StringUtils.containsIgnoreCase(targetColumnType, sourceColumnType);
|
|
|
+ if (!columnTypeCompare) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// if (!StringUtils.equalsIgnoreCase(sourceField.getDefaultValue(),
|
|
@@ -536,8 +563,14 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
// return false;
|
|
|
// }
|
|
|
|
|
|
- if (sourceField.isNullable() != targetField.isNullable()) {
|
|
|
- return false;
|
|
|
+ // BLOB, TEXT, GEOMETRY or JSON默认都是nullable,可以忽略比较,但比较了也是对齐
|
|
|
+ if (StringUtils.containsIgnoreCase(sourceColumnType, "timestamp")
|
|
|
+ || StringUtils.containsIgnoreCase(targetColumnType, "timestamp")) {
|
|
|
+ // timestamp可能会加default current_timestamp默认值,忽略对比nullable
|
|
|
+ } else {
|
|
|
+ if (sourceField.isNullable() != targetField.isNullable()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// mysql会有一种处理,针对show create只有uk没有pk时,会在desc默认将uk当做pk
|
|
@@ -551,6 +584,47 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * <pre>
|
|
|
+ * synonyms处理
|
|
|
+ * 1. BOOL/BOOLEAN => TINYINT
|
|
|
+ * 2. DEC/NUMERIC/FIXED => DECIMAL
|
|
|
+ * 3. INTEGER => INT
|
|
|
+ *
|
|
|
+ *
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * @param originType
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private static String synonymsType(String originType) {
|
|
|
+ if (StringUtils.equalsIgnoreCase(originType, "bool") || StringUtils.equalsIgnoreCase(originType, "boolean")) {
|
|
|
+ return "tinyint";
|
|
|
+ } else if (StringUtils.equalsIgnoreCase(originType, "dec")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "numeric")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "fixed")) {
|
|
|
+ return "decimal";
|
|
|
+ } else if (StringUtils.equalsIgnoreCase(originType, "integer")) {
|
|
|
+ return "int";
|
|
|
+ } else if (StringUtils.equalsIgnoreCase(originType, "real")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "double precision")) {
|
|
|
+ return "double";
|
|
|
+ }
|
|
|
+
|
|
|
+ // BLOB、TEXT会根据给定长度自动转换为对应的TINY、MEDIUM,LONG类型,长度和字符集也有关,统一按照blob对比
|
|
|
+ if (StringUtils.equalsIgnoreCase(originType, "tinyblob")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "mediumblob")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "longblob")) {
|
|
|
+ return "blob";
|
|
|
+ } else if (StringUtils.equalsIgnoreCase(originType, "tinytext")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "mediumtext")
|
|
|
+ || StringUtils.equalsIgnoreCase(originType, "longtext")) {
|
|
|
+ return "text";
|
|
|
+ }
|
|
|
+
|
|
|
+ return originType;
|
|
|
+ }
|
|
|
+
|
|
|
private int snapshotExpire(int expireTimestamp) {
|
|
|
return metaSnapshotDAO.deleteByTimestamp(destination, expireTimestamp);
|
|
|
}
|
|
@@ -582,14 +656,14 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
public void setBlackFilter(CanalEventFilter blackFilter) {
|
|
|
this.blackFilter = blackFilter;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void setFieldFilterMap(Map<String, List<String>> fieldFilterMap) {
|
|
|
- this.fieldFilterMap = fieldFilterMap;
|
|
|
- }
|
|
|
-
|
|
|
- public void setFieldBlackFilterMap(Map<String, List<String>> fieldBlackFilterMap) {
|
|
|
- this.fieldBlackFilterMap = fieldBlackFilterMap;
|
|
|
- }
|
|
|
+ this.fieldFilterMap = fieldFilterMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setFieldBlackFilterMap(Map<String, List<String>> fieldBlackFilterMap) {
|
|
|
+ this.fieldBlackFilterMap = fieldBlackFilterMap;
|
|
|
+ }
|
|
|
|
|
|
public int getSnapshotInterval() {
|
|
|
return snapshotInterval;
|
|
@@ -618,4 +692,9 @@ public class DatabaseTableMeta implements TableMetaTSDB {
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
+
|
|
|
+ public static void main(String[] args) {
|
|
|
+ String str = StringUtils.substringBefore("int(11)", "(");
|
|
|
+ System.out.println(str);
|
|
|
+ }
|
|
|
}
|